kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 0.11.0 updated: KAFKA-6925: fix parentSensors memory leak (#5108) (#5120)
Date Sun, 03 Jun 2018 17:59:15 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new b5b795b  KAFKA-6925: fix parentSensors memory leak (#5108) (#5120)
b5b795b is described below

commit b5b795b3021f9bd7a95b957c15935070a6a8f4be
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Sun Jun 3 12:58:34 2018 -0500

    KAFKA-6925: fix parentSensors memory leak (#5108) (#5120)
    
    Previously, we failed to remove sensors from the parentSensors map, effectively a memory
leak.
    
    Add a test to verify that removed sensors get removed from the underlying registry as
well as the parentSensors map.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/processor/internals/StreamsMetricsImpl.java    |  1 +
 .../processor/internals/StreamsMetricsImplTest.java        | 14 +++++++++++++-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index ea85b74..7f269e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -216,6 +216,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         parent = parentSensors.get(sensor);
         if (parent != null) {
             metrics.removeSensor(parent.name());
+            parentSensors.remove(sensor);
         }
 
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index 0e87a6d..0f91ae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -19,11 +19,15 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -53,19 +57,27 @@ public class StreamsMetricsImplTest {
         String entity = "entity";
         String operation = "put";
         Map<String, String> tags = new HashMap<>();
-        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName,
tags);
+        final Metrics metrics = new Metrics();
+        final Map<MetricName, KafkaMetric> initialMetrics = Collections.unmodifiableMap(new
LinkedHashMap<>(metrics.metrics()));
+        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, groupName, tags);
 
         Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor1);
+        Assert.assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG,
sensor1);
         streamsMetrics.removeSensor(sensor1a);
+        Assert.assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation,
Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor2);
+        Assert.assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
+        Assert.assertEquals(initialMetrics, metrics.metrics());
+
+        Assert.assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors);
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message