kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8874: Add consumer metrics to observe user poll behavior (KIP-517)
Date Thu, 17 Oct 2019 09:17:44 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 5193ecb  KAFKA-8874: Add consumer metrics to observe user poll behavior (KIP-517)
5193ecb is described below

commit 5193ecb8d3f41868c2933e1d9b87e63c43c08a5a
Author: Kevin Lu <kelu@paypal.com>
AuthorDate: Thu Oct 17 14:46:42 2019 +0530

    KAFKA-8874: Add consumer metrics to observe user poll behavior (KIP-517)
    
    https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metrics+to+observe+user+poll+behavior
    
    Author: Kevin Lu <kelu@paypal.com>
    
    Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Jason Gustafson <jason@confluent.io>
    
    Closes #7395 from KevinLiLu/KIP517-KAFKA8874
    
    (cherry picked from commit ed078bd702e30b300050aefd7a65561fbd75049f)
    Signed-off-by: Manikumar Reddy <manikumar@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  8 ++
 .../consumer/internals/KafkaConsumerMetrics.java   | 81 ++++++++++++++++++
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 97 +++++++++++++++++++++-
 docs/ops.html                                      | 30 +++++++
 4 files changed, 215 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cacf8d5..f12beaf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
+import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
@@ -565,6 +566,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
     // Visible for testing
     final Metrics metrics;
+    final KafkaConsumerMetrics kafkaConsumerMetrics;
 
     private final Logger log;
     private final String clientId;
@@ -806,6 +808,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     isolationLevel,
                     apiVersions);
 
+            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, metricGrpPrefix);
+
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
             log.debug("Kafka consumer initialized");
@@ -852,6 +856,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.assignors = assignors;
         this.groupId = groupId;
+        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
     }
 
     private static String buildClientId(String configuredClientId, GroupRebalanceConfig rebalanceConfig)
{
@@ -1212,6 +1217,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout)
{
         acquireAndEnsureOpen();
         try {
+            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
+
             if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                 throw new IllegalStateException("Consumer is not subscribed to any topics
or assigned any partitions");
             }
@@ -1249,6 +1256,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             return ConsumerRecords.empty();
         } finally {
             release();
+            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
new file mode 100644
index 0000000..ae61ff1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+
+import java.util.concurrent.TimeUnit;
+
+public class KafkaConsumerMetrics {
+    private final Metrics metrics;
+
+    private Sensor timeBetweenPollSensor;
+    private Sensor pollIdleSensor;
+    private long lastPollMs;
+    private long pollStartMs;
+    private long timeSinceLastPollMs;
+
+    public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+        this.metrics = metrics;
+
+        String metricGroupName = metricGrpPrefix + "-metrics";
+        Measurable lastPoll = (mConfig, now) -> {
+            if (lastPollMs == 0L)
+                // if no poll is ever triggered, just return -1.
+                return -1d;
+            else
+                return TimeUnit.SECONDS.convert(now - lastPollMs, TimeUnit.MILLISECONDS);
+        };
+        metrics.addMetric(metrics.metricName("last-poll-seconds-ago",
+                metricGroupName,
+                "The number of seconds since the last poll() invocation."),
+                lastPoll);
+
+        this.timeBetweenPollSensor = metrics.sensor("time-between-poll");
+        this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-avg",
+                metricGroupName,
+                "The average delay between invocations of poll()."),
+                new Avg());
+        this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-max",
+                metricGroupName,
+                "The max delay between invocations of poll()."),
+                new Max());
+
+        this.pollIdleSensor = metrics.sensor("poll-idle-ratio-avg");
+        this.pollIdleSensor.add(metrics.metricName("poll-idle-ratio-avg",
+                metricGroupName,
+                "The average fraction of time the consumer's poll() is idle as opposed to
waiting for the user code to process records."),
+                new Avg());
+    }
+
+    public void recordPollStart(long pollStartMs) {
+        this.pollStartMs = pollStartMs;
+        this.timeSinceLastPollMs = lastPollMs != 0L ? pollStartMs - lastPollMs : 0;
+        this.timeBetweenPollSensor.record(timeSinceLastPollMs);
+        this.lastPollMs = pollStartMs;
+    }
+
+    public void recordPollEnd(long pollEndMs) {
+        long pollTimeMs = pollEndMs - pollStartMs;
+        double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs);
+        this.pollIdleSensor.record(pollIdleRatio);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index a9c4d25..68fac2b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -1947,7 +1948,7 @@ public class KafkaConsumerTest {
         List<ConsumerPartitionAssignor> assignors = singletonList(assignor);
         ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.emptyList());
 
-        Metrics metrics = new Metrics();
+        Metrics metrics = new Metrics(time);
         ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
 
         LogContext loggerFactory = new LogContext();
@@ -2062,4 +2063,98 @@ public class KafkaConsumerTest {
 
         consumer.poll(Duration.ZERO);
     }
+
+    @Test
+    public void testPollTimeMetrics() {
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Collections.singletonMap(topic, 1));
+
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription,
metadata, assignor, true, groupInstanceId);
+        consumer.subscribe(singletonList(topic));
+        // MetricName objects to check
+        Metrics metrics = consumer.metrics;
+        MetricName lastPollSecondsAgoName = metrics.metricName("last-poll-seconds-ago", "consumer-metrics");
+        MetricName timeBetweenPollAvgName = metrics.metricName("time-between-poll-avg", "consumer-metrics");
+        MetricName timeBetweenPollMaxName = metrics.metricName("time-between-poll-max", "consumer-metrics");
+        // Test default values
+        assertEquals(-1.0d, consumer.metrics().get(lastPollSecondsAgoName).metricValue());
+        assertEquals(Double.NaN, consumer.metrics().get(timeBetweenPollAvgName).metricValue());
+        assertEquals(Double.NaN, consumer.metrics().get(timeBetweenPollMaxName).metricValue());
+        // Call first poll
+        consumer.poll(Duration.ZERO);
+        assertEquals(0.0d, consumer.metrics().get(lastPollSecondsAgoName).metricValue());
+        assertEquals(0.0d, consumer.metrics().get(timeBetweenPollAvgName).metricValue());
+        assertEquals(0.0d, consumer.metrics().get(timeBetweenPollMaxName).metricValue());
+        // Advance time by 5,000 (total time = 5,000)
+        time.sleep(5 * 1000L);
+        assertEquals(5.0d, consumer.metrics().get(lastPollSecondsAgoName).metricValue());
+        // Call second poll
+        consumer.poll(Duration.ZERO);
+        assertEquals(2.5 * 1000d, consumer.metrics().get(timeBetweenPollAvgName).metricValue());
+        assertEquals(5 * 1000d, consumer.metrics().get(timeBetweenPollMaxName).metricValue());
+        // Advance time by 10,000 (total time = 15,000)
+        time.sleep(10 * 1000L);
+        assertEquals(10.0d, consumer.metrics().get(lastPollSecondsAgoName).metricValue());
+        // Call third poll
+        consumer.poll(Duration.ZERO);
+        assertEquals(5 * 1000d, consumer.metrics().get(timeBetweenPollAvgName).metricValue());
+        assertEquals(10 * 1000d, consumer.metrics().get(timeBetweenPollMaxName).metricValue());
+        // Advance time by 5,000 (total time = 20,000)
+        time.sleep(5 * 1000L);
+        assertEquals(5.0d, consumer.metrics().get(lastPollSecondsAgoName).metricValue());
+        // Call fourth poll
+        consumer.poll(Duration.ZERO);
+        assertEquals(5 * 1000d, consumer.metrics().get(timeBetweenPollAvgName).metricValue());
+        assertEquals(10 * 1000d, consumer.metrics().get(timeBetweenPollMaxName).metricValue());
+    }
+
+    @Test
+    public void testPollIdleRatio() {
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Collections.singletonMap(topic, 1));
+
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription,
metadata, assignor, true, groupInstanceId);
+        // MetricName object to check
+        Metrics metrics = consumer.metrics;
+        MetricName pollIdleRatio = metrics.metricName("poll-idle-ratio-avg", "consumer-metrics");
+        // Test default value
+        assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue());
+
+        // 1st poll
+        // Spend 50ms in poll so value = 1.0
+        consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
+        time.sleep(50);
+        consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+
+        assertEquals(1.0d, consumer.metrics().get(pollIdleRatio).metricValue());
+
+        // 2nd poll
+        // Spend 50m outside poll and 0ms in poll so value = 0.0
+        time.sleep(50);
+        consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
+        consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+
+        // Avg of first two data points
+        assertEquals((1.0d + 0.0d) / 2, consumer.metrics().get(pollIdleRatio).metricValue());
+
+        // 3rd poll
+        // Spend 25ms outside poll and 25ms in poll so value = 0.5
+        time.sleep(25);
+        consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
+        time.sleep(25);
+        consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+
+        // Avg of three data points
+        assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue());
+    }
 }
diff --git a/docs/ops.html b/docs/ops.html
index 18d9bb5..0896758 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1371,6 +1371,36 @@
 
   The following metrics are available on consumer instances.
 
+  <table class="data-table">
+    <tbody>
+      <tr>
+        <th>Metric/Attribute name</th>
+        <th>Description</th>
+        <th>Mbean name</th>
+      </tr>
+      <tr>
+        <td>time-between-poll-avg</td>
+        <td>The average delay between invocations of poll().</td>
+        <td>kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>time-between-poll-max</td>
+        <td>The max delay between invocations of poll().</td>
+        <td>kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>last-poll-seconds-ago</td>
+        <td>The number of seconds since the last poll() invocation.</td>
+        <td>kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>poll-idle-ratio-avg</td>
+        <td>The average fraction of time the consumer's poll() is idle as opposed to
waiting for the user code to process records.</td>
+        <td>kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)</td>
+      </tr>
+    </tbody>
+  </table>
+
   <h5><a id="consumer_group_monitoring" href="#consumer_group_monitoring">Consumer
Group Metrics</a></h5>
   <table class="data-table">
     <tbody>


Mime
View raw message