From commits-return-7836-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Mon Sep 18 09:13:44 2017 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1B7C5CA82 for ; Mon, 18 Sep 2017 09:13:44 +0000 (UTC) Received: (qmail 76265 invoked by uid 500); 18 Sep 2017 09:13:44 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 76236 invoked by uid 500); 18 Sep 2017 09:13:43 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 76227 invoked by uid 99); 18 Sep 2017 09:13:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Sep 2017 09:13:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C8F4AE0A98; Mon, 18 Sep 2017 09:13:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Add metric templates for sender/fetcher rate totals Date: Mon, 18 Sep 2017 09:13:43 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk f305dd68f -> a339a387e MINOR: Add metric templates for sender/fetcher rate totals Author: Rajini Sivaram Reviewers: James Cheng , Ismael Juma Closes #3882 from rajinisivaram/MINOR-KAFKA-5738-metricstemplates Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a339a387 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a339a387 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a339a387 Branch: refs/heads/trunk Commit: a339a387e0669226bc1537783b3d4c6ea5b9b4c0 Parents: f305dd6 Author: Rajini Sivaram Authored: Mon Sep 18 10:13:28 2017 +0100 Committer: Ismael Juma Committed: Mon Sep 18 10:13:28 2017 +0100 ---------------------------------------------------------------------- .../internals/SenderMetricsRegistry.java | 10 +++++- .../apache/kafka/common/MetricNameTemplate.java | 23 ++++++++++++- .../clients/consumer/internals/FetcherTest.java | 34 ++++++++++++++++++++ .../clients/producer/internals/SenderTest.java | 30 +++++++++++++++++ .../java/org/apache/kafka/test/TestUtils.java | 12 ++++++- 5 files changed, 106 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java index 9e014a7..21466e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java @@ -117,24 +117,32 @@ public class SenderMetricsRegistry { this.requestLatencyAvg, this.requestLatencyMax, this.recordSendRate, + this.recordSendTotal, this.recordsPerRequestAvg, this.recordRetryRate, + this.recordRetryTotal, this.recordErrorRate, + this.recordErrorTotal, this.recordSizeMax, this.recordSizeAvg, this.requestsInFlight, this.metadataAge, this.batchSplitRate, + this.batchSplitTotal, this.produceThrottleTimeAvg, this.produceThrottleTimeMax, // per-topic metrics this.topicRecordSendRate, + this.topicRecordSendTotal, this.topicByteRate, + this.topicByteTotal, this.topicCompressionRate, this.topicRecordRetryRate, - this.topicRecordErrorRate + this.topicRecordRetryTotal, + this.topicRecordErrorRate, + this.topicRecordErrorTotal ); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java index d768f22..e3ea995 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java @@ -17,6 +17,7 @@ package org.apache.kafka.common; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import org.apache.kafka.common.utils.Utils; @@ -65,8 +66,28 @@ public class MetricNameTemplate { return this.description; } - public Set tags() { return tags; } + + @Override + public int hashCode() { + return Objects.hash(name, group, tags); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + MetricNameTemplate other = (MetricNameTemplate) o; + return Objects.equals(name, other.name) && Objects.equals(group, other.group) && + Objects.equals(tags, other.tags); + } + + @Override + public String toString() { + return String.format("name=%s, group=%s, tags=%s", name, group, tags); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index b2ede15..5263d3b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -43,6 +44,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.NetworkReceive; @@ -1361,6 +1363,38 @@ public class FetcherTest { assertEquals(3, recordsCountAverage.value(), EPSILON); } + @Test + public void testFetcherMetricsTemplates() throws Exception { + metrics.close(); + Map clientTags = Collections.singletonMap("client-id", "clientA"); + metrics = new Metrics(new MetricConfig().tags(clientTags)); + metricsRegistry = new FetcherMetricsRegistry(clientTags.keySet(), "consumer" + groupId); + fetcher.close(); + fetcher = createFetcher(subscriptions, metrics); + + // Fetch from topic to generate topic metrics + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + assertEquals(1, fetcher.sendFetches()); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); + consumerClient.poll(0); + assertTrue(fetcher.hasCompletedFetches()); + Map>> partitionRecords = fetcher.fetchedRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + + // Create throttle metrics + Fetcher.throttleTimeSensor(metrics, metricsRegistry); + + // Verify that all metrics except metrics-count have registered templates + Set allMetrics = new HashSet<>(); + for (MetricName n : metrics.metrics().keySet()) { + String name = n.name().replaceAll(tp0.toString(), "{topic}-{partition}"); + if (!n.group().equals("kafka-metrics-count")) + allMetrics.add(new MetricNameTemplate(name, n.group(), "", n.tags().keySet())); + } + TestUtils.checkEquals(allMetrics, new HashSet<>(metricsRegistry.getAllTemplates()), "metrics", "templates"); + } + private Map>> fetchRecords( TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) { return fetchRecords(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime); http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 26e3e6c..a64bc56 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -70,10 +71,12 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -275,6 +278,33 @@ public class SenderTest { } @Test + public void testSenderMetricsTemplates() throws Exception { + metrics.close(); + Map clientTags = Collections.singletonMap("client-id", "clientA"); + metrics = new Metrics(new MetricConfig().tags(clientTags)); + SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(clientTags.keySet()); + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + 1, metrics, metricsRegistry, time, REQUEST_TIMEOUT, 50, null, apiVersions); + + // Append a message so that topic metrics are created + accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT); + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + client.respond(produceResponse(tp0, 0, Errors.NONE, 0)); + sender.run(time.milliseconds()); + // Create throttle time metrics + Sender.throttleTimeSensor(metrics, metricsRegistry); + + // Verify that all metrics except metrics-count have registered templates + Set allMetrics = new HashSet<>(); + for (MetricName n : metrics.metrics().keySet()) { + if (!n.group().equals("kafka-metrics-count")) + allMetrics.add(new MetricNameTemplate(n.name(), n.group(), "", n.tags().keySet())); + } + TestUtils.checkEquals(allMetrics, new HashSet<>(metricsRegistry.getAllTemplates()), "metrics", "templates"); + } + + @Test public void testRetries() throws Exception { // create a sender with retries = 1 int maxRetries = 1; http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 47ca823..958ab2c 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -35,6 +35,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -317,6 +318,16 @@ public class TestUtils { assertEquals(Utils.toList(it1), Utils.toList(it2)); } + public static void checkEquals(Set c1, Set c2, String firstDesc, String secondDesc) { + if (!c1.equals(c2)) { + Set missing1 = new HashSet<>(c2); + missing1.removeAll(c1); + Set missing2 = new HashSet<>(c1); + missing2.removeAll(c2); + fail(String.format("Sets not equal, missing %s=%s, missing %s=%s", firstDesc, missing1, secondDesc, missing2)); + } + } + public static List toList(Iterable iterable) { List list = new ArrayList<>(); for (T item : iterable) @@ -330,5 +341,4 @@ public class TestUtils { buffer.rewind(); return buffer; } - }