This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.10.2 by this push:
new fdc742b KAFKA-7660: fix streams and Metrics memory leaks (#5984)
fdc742b is described below
commit fdc742b1ade420682911b3e336ae04827639cc04
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Dec 5 19:18:08 2018 -0600
KAFKA-7660: fix streams and Metrics memory leaks (#5984)
---
.../java/org/apache/kafka/common/metrics/Metrics.java | 6 ++++++
.../java/org/apache/kafka/common/metrics/Sensor.java | 10 ++++++++--
.../org/apache/kafka/common/metrics/MetricsTest.java | 16 ++++++++++++++++
.../streams/processor/internals/StreamsMetricsImpl.java | 4 ++--
.../processor/internals/StreamsMetricsImplTest.java | 14 +++++++++++++-
5 files changed, 45 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 512c18e..874c172 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -367,6 +367,12 @@ public class Metrics implements Closeable {
removeMetric(metric.metricName());
log.debug("Removed sensor with name {}", name);
childSensors = childrenSensors.remove(sensor);
+ for (final Sensor parent : sensor.parents()) {
+ final List<Sensor> peers = childrenSensors.get(parent);
+ if (peers != null) {
+ peers.remove(sensor);
+ }
+ }
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 4a9b488..33829f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -18,13 +18,15 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
/**
* A sensor applies a continuous sequence of numerical values to a set of associated metrics.
For example a sensor on
* message size would record a sequence of message sizes using the {@link #record(double)}
api and would maintain a set
@@ -128,6 +130,10 @@ public final class Sensor {
return this.name;
}
+ List<Sensor> parents() {
+ return unmodifiableList(asList(parents));
+ }
+
/**
* Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
*/
@@ -260,6 +266,6 @@ public final class Sensor {
}
synchronized List<KafkaMetric> metrics() {
- return Collections.unmodifiableList(this.metrics);
+ return unmodifiableList(this.metrics);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 5797b36..5ee79de 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -12,6 +12,8 @@
*/
package org.apache.kafka.common.metrics;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -170,6 +172,20 @@ public class MetricsTest {
}
@Test
+ public void testRemoveChildSensor() {
+ final Metrics metrics = new Metrics();
+
+ final Sensor parent = metrics.sensor("parent");
+ final Sensor child = metrics.sensor("child", parent);
+
+ assertEquals(singletonList(child), metrics.childrenSensors().get(parent));
+
+ metrics.removeSensor("child");
+
+ assertEquals(emptyList(), metrics.childrenSensors().get(parent));
+ }
+
+ @Test
public void testRemoveSensor() {
int size = metrics.metrics().size();
Sensor parent1 = metrics.sensor("test.parent1");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index bccf736..9374882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -196,11 +196,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
*/
@Override
public void removeSensor(Sensor sensor) {
- Sensor parent = null;
Objects.requireNonNull(sensor, "Sensor is null");
metrics.removeSensor(sensor.name());
- parent = parentSensors.get(sensor);
+
+ final Sensor parent = parentSensors.remove(sensor);
if (parent != null) {
metrics.removeSensor(parent.name());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index c6bc250..42b48ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -19,13 +19,17 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.junit.Test;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.unmodifiableMap;
import static org.junit.Assert.assertEquals;
public class StreamsMetricsImplTest {
@@ -53,19 +57,27 @@ public class StreamsMetricsImplTest {
String entity = "entity";
String operation = "put";
Map<String, String> tags = new HashMap<>();
- StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName,
tags);
+ final Metrics metrics = new Metrics();
+ final Map<MetricName, KafkaMetric> initialMetrics = unmodifiableMap(new LinkedHashMap<>(metrics.metrics()));
+ StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, groupName, tags);
Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor1);
+ assertEquals(initialMetrics, metrics.metrics());
Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG,
sensor1);
streamsMetrics.removeSensor(sensor1a);
+ assertEquals(initialMetrics, metrics.metrics());
Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation,
Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor2);
+ assertEquals(initialMetrics, metrics.metrics());
Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor3);
+ assertEquals(initialMetrics, metrics.metrics());
+
+ assertEquals(emptyMap(), streamsMetrics.parentSensors);
}
@Test
|