kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9306: The consumer must close KafkaConsumerMetrics (#7839)
Date Thu, 19 Dec 2019 18:37:52 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new eb20a0d  KAFKA-9306: The consumer must close KafkaConsumerMetrics (#7839)
eb20a0d is described below

commit eb20a0d1b762f44a6922f90fec83a42c00942675
Author: Colin Patrick McCabe <cmccabe@apache.org>
AuthorDate: Thu Dec 19 09:38:26 2019 -0800

    KAFKA-9306: The consumer must close KafkaConsumerMetrics (#7839)
    
    Reviewers: Vikas Singh <vikas@confluent.io>, Jason Gustafson <jason@confluent.io>,
Shailesh Panwar <spanwar@confluent.io>
    (cherry picked from commit 7e36865541307554ebad410d61c8186f6d641c55)
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  1 +
 .../consumer/internals/KafkaConsumerMetrics.java   | 23 +++++++++++++--------
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 24 ++++++++++++++++++++++
 3 files changed, 40 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 77499ff..9d3cf6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2298,6 +2298,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         }
         Utils.closeQuietly(fetcher, "fetcher", firstException);
         Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
+        Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
         Utils.closeQuietly(metrics, "consumer metrics", firstException);
         Utils.closeQuietly(client, "consumer network client", firstException);
         Utils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
index ae61ff1..aab3133 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -24,11 +25,11 @@ import org.apache.kafka.common.metrics.stats.Max;
 
 import java.util.concurrent.TimeUnit;
 
-public class KafkaConsumerMetrics {
+public class KafkaConsumerMetrics implements AutoCloseable {
     private final Metrics metrics;
-
-    private Sensor timeBetweenPollSensor;
-    private Sensor pollIdleSensor;
+    private final MetricName lastPollMetricName;
+    private final Sensor timeBetweenPollSensor;
+    private final Sensor pollIdleSensor;
     private long lastPollMs;
     private long pollStartMs;
     private long timeSinceLastPollMs;
@@ -44,10 +45,9 @@ public class KafkaConsumerMetrics {
             else
                 return TimeUnit.SECONDS.convert(now - lastPollMs, TimeUnit.MILLISECONDS);
         };
-        metrics.addMetric(metrics.metricName("last-poll-seconds-ago",
-                metricGroupName,
-                "The number of seconds since the last poll() invocation."),
-                lastPoll);
+        this.lastPollMetricName = metrics.metricName("last-poll-seconds-ago",
+                metricGroupName, "The number of seconds since the last poll() invocation.");
+        metrics.addMetric(lastPollMetricName, lastPoll);
 
         this.timeBetweenPollSensor = metrics.sensor("time-between-poll");
         this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-avg",
@@ -78,4 +78,11 @@ public class KafkaConsumerMetrics {
         double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs);
         this.pollIdleSensor.record(pollIdleRatio);
     }
+
+    @Override
+    public void close() {
+        metrics.removeMetric(lastPollMetricName);
+        metrics.removeSensor(timeBetweenPollSensor.name());
+        metrics.removeSensor(pollIdleSensor.name());
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 1fd9fb3..a31e77a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2258,4 +2258,28 @@ public class KafkaConsumerTest {
         // Avg of three data points
         assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue());
     }
+
+    private static boolean consumerMetricPresent(KafkaConsumer consumer, String name) {
+        MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap());
+        return consumer.metrics.metrics().containsKey(metricName);
+    }
+
+    @Test
+    public void testClosingConsumerUnregistersConsumerMetrics() {
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription,
metadata,
+            new RoundRobinAssignor(), true, groupInstanceId);
+        consumer.subscribe(singletonList(topic));
+        assertTrue(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
+        assertTrue(consumerMetricPresent(consumer, "time-between-poll-avg"));
+        assertTrue(consumerMetricPresent(consumer, "time-between-poll-max"));
+        consumer.close();
+        assertFalse(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
+        assertFalse(consumerMetricPresent(consumer, "time-between-poll-avg"));
+        assertFalse(consumerMetricPresent(consumer, "time-between-poll-max"));
+    }
 }


Mime
View raw message