kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6252; Close the metric group to clean up any existing metrics
Date Sat, 06 Jan 2018 05:23:55 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 81f5e39  KAFKA-6252; Close the metric group to clean up any existing metrics
81f5e39 is described below

commit 81f5e3979c9266abb4887e08ae98a0f5c0e0eba3
Author: Arjun Satish <arjun@confluent.io>
AuthorDate: Fri Jan 5 21:23:39 2018 -0800

    KAFKA-6252; Close the metric group to clean up any existing metrics
    
    We are closing the metricGroups created in a Worker, Source task and Sink task before
populating them with new metrics. This helps in cases where an Exception is thrown when previously
created groups were not cleaned up correctly.
    
    Signed-off-by: Arjun Satish <arjunconfluent.io>
    
    Author: Arjun Satish <arjun@confluent.io>
    
    Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>,
Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4397 from wicknicks/KAFKA-6252
---
 .../kafka/connect/runtime/WorkerConnector.java     |  2 +
 .../kafka/connect/runtime/WorkerSinkTask.java      |  2 +
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +
 .../apache/kafka/connect/runtime/WorkerTask.java   |  2 +
 .../kafka/connect/runtime/ConnectMetricsTest.java  | 44 +++++++++++++++++++++-
 5 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9e65cd2..9b934f3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -247,6 +247,8 @@ public class WorkerConnector {
             ConnectMetricsRegistry registry = connectMetrics.registry();
             this.metricGroup = connectMetrics.group(registry.connectorGroupName(),
                     registry.connectorTagName(), connName);
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             metricGroup.addImmutableValueMetric(registry.connectorType, connectorType());
             metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 234ce8a..587e4c6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -652,6 +652,8 @@ class WorkerSinkTask extends WorkerTask {
             metricGroup = connectMetrics
                                   .group(registry.sinkTaskGroupName(), registry.connectorTagName(),
id.connector(), registry.taskTagName(),
                                          Integer.toString(id.task()));
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
             sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9072cd4..a172cdb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -494,6 +494,8 @@ class WorkerSourceTask extends WorkerTask {
             metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
                     registry.connectorTagName(), id.connector(),
                     registry.taskTagName(), Integer.toString(id.task()));
+            // remove any previously created metrics in this group to prevent collisions.
+            metricGroup.close();
 
             sourceRecordPoll = metricGroup.sensor("source-record-poll");
             sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new
Rate());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ec06924..d563f9b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -313,6 +313,8 @@ abstract class WorkerTask implements Runnable {
             metricGroup = connectMetrics.group(registry.taskGroupName(),
                     registry.connectorTagName(), id.connector(),
                     registry.taskTagName(), Integer.toString(id.task()));
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             metricGroup.addValueMetric(registry.taskStatus, new LiteralSupplier<String>()
{
                 @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 2de7cb6..d247df8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -16,13 +16,19 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
 import org.apache.kafka.connect.util.MockTime;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -136,4 +142,40 @@ public class ConnectMetricsTest {
         assertNotNull(id1.tags());
         assertNotNull(id2.tags());
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testRecreateWithClose() {
+        int numMetrics = addToGroup(metrics, false);
+        int numMetricsInRecreatedGroup = addToGroup(metrics, true);
+        Assert.assertEquals(numMetrics, numMetricsInRecreatedGroup);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testRecreateWithoutClose() {
+        int numMetrics = addToGroup(metrics, false);
+        int numMetricsInRecreatedGroup = addToGroup(metrics, false);
+        // we should never get here
+        throw new RuntimeException("Created " + numMetricsInRecreatedGroup
+                + " metrics in recreated group. Original=" + numMetrics);
+    }
+
+    private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(),
+                registry.connectorTagName(), "conn_name");
+
+        if (shouldClose) {
+            metricGroup.close();
+        }
+
+        Sensor sensor = metricGroup.sensor("my_sensor");
+        sensor.add(metricName("x1"), new Max());
+        sensor.add(metricName("y2"), new Avg());
+
+        return metricGroup.metrics().metrics().size();
+    }
+
+    static MetricName metricName(String name) {
+        return new MetricName(name, "test_group", "metrics for testing", Collections.<String,
String>emptyMap());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message