kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-7136: Avoid deadlocks in synchronized metrics reporters (#5341)
Date Fri, 06 Jul 2018 18:09:50 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 10f27ce  KAFKA-7136: Avoid deadlocks in synchronized metrics reporters (#5341)
10f27ce is described below

commit 10f27ce12edfcd31eb598abf88acebc37d4b90d0
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri Jul 6 18:54:28 2018 +0100

    KAFKA-7136: Avoid deadlocks in synchronized metrics reporters (#5341)
    
    We need to use the same lock for metric update and read to avoid NPE and concurrent modification
exceptions. Sensor add/remove/update are synchronized on Sensor since they access lists and
maps that are not thread-safe. Reporters are notified of metrics add/remove while holding
(Sensor, Metrics) locks and reporters may synchronize on the reporter lock. Metric read may
be invoked by metrics reporters while holding a reporter lock. So read/update cannot be synchronized
using Sensor sinc [...]
    Locking order:
    
    - Sensor#add: Sensor -> Metrics -> MetricsReporter
    - Metrics#removeSensor: Sensor -> Metrics -> MetricsReporter
    - KafkaMetric#metricValue: MetricsReporter -> Sensor#metricLock
    - Sensor#record: Sensor -> Sensor#metricLock
    
    Reviewers: Jun Rao <junrao@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
---
 checkstyle/suppressions.xml                        |   3 +
 .../org/apache/kafka/common/metrics/Sensor.java    |  38 ++++--
 .../apache/kafka/common/metrics/MetricsTest.java   | 144 +++++++++++++++++++--
 3 files changed, 167 insertions(+), 18 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 1ec535f..a5e94a3 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -72,6 +72,9 @@
     <suppress checks="JavaNCSS"
               files="RequestResponseTest.java"/>
 
+    <suppress checks="NPathComplexity"
+              files="MemoryRecordsTest|MetricsTest"/>
+
     <!-- Connect -->
     <suppress checks="ClassFanOutComplexity"
               files="DistributedHerder(|Test).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index e95dbf7..7ee23d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -45,6 +45,7 @@ public final class Sensor {
     private final Time time;
     private volatile long lastRecordTime;
     private final long inactiveSensorExpirationTimeMs;
+    private final Object metricLock;
 
     public enum RecordingLevel {
         INFO(0, "INFO"), DEBUG(1, "DEBUG");
@@ -114,6 +115,7 @@ public final class Sensor {
         this.inactiveSensorExpirationTimeMs = TimeUnit.MILLISECONDS.convert(inactiveSensorExpirationTimeSeconds,
TimeUnit.SECONDS);
         this.lastRecordTime = time.milliseconds();
         this.recordingLevel = recordingLevel;
+        this.metricLock = new Object();
         checkForest(new HashSet<Sensor>());
     }
 
@@ -175,9 +177,11 @@ public final class Sensor {
         if (shouldRecord()) {
             this.lastRecordTime = timeMs;
             synchronized (this) {
-                // increment all the stats
-                for (Stat stat : this.stats)
-                    stat.record(config, value, timeMs);
+                synchronized (metricLock()) {
+                    // increment all the stats
+                    for (Stat stat : this.stats)
+                        stat.record(config, value, timeMs);
+                }
                 if (checkQuotas)
                     checkQuotas(timeMs);
             }
@@ -224,7 +228,7 @@ public final class Sensor {
      */
     public synchronized void add(CompoundStat stat, MetricConfig config) {
         this.stats.add(Utils.notNull(stat));
-        Object lock = metricLock(stat);
+        Object lock = metricLock();
         for (NamedMeasurable m : stat.stats()) {
             KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null
? this.config : config, time);
             this.registry.registerMetric(metric);
@@ -248,7 +252,7 @@ public final class Sensor {
      * @param config A special configuration for this metric. If null use the sensor default
configuration.
      */
     public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig
config) {
-        KafkaMetric metric = new KafkaMetric(metricLock(stat),
+        KafkaMetric metric = new KafkaMetric(metricLock(),
                                              Utils.notNull(metricName),
                                              Utils.notNull(stat),
                                              config == null ? this.config : config,
@@ -271,10 +275,26 @@ public final class Sensor {
     }
 
     /**
-     * KafkaMetrics of sensors which use SampledStat should be synchronized on the Sensor
object
-     * to allow concurrent reads and updates. For simplicity, all sensors are synchronized
on Sensor.
+     * KafkaMetrics of sensors which use SampledStat should be synchronized on the same lock
+     * for sensor record and metric value read to allow concurrent reads and updates. For
simplicity,
+     * all sensors are synchronized on this object.
+     * <p>
+     * Sensor object is not used as a lock for reading metric value since metrics reporter
is
+     * invoked while holding Sensor and Metrics locks to report addition and removal of metrics
+     * and synchronized reporters may deadlock if Sensor lock is used for reading metrics
values.
+     * Note that Sensor object itself is used as a lock to protect the access to stats and
metrics
+     * while recording metric values, adding and deleting sensors.
+     * </p><p>
+     * Locking order (assume all MetricsReporter methods may be synchronized):
+     * <ul>
+     *   <li>Sensor#add: Sensor -> Metrics -> MetricsReporter</li>
+     *   <li>Metrics#removeSensor: Sensor -> Metrics -> MetricsReporter</li>
+     *   <li>KafkaMetric#metricValue: MetricsReporter -> Sensor#metricLock</li>
+     *   <li>Sensor#record: Sensor -> Sensor#metricLock</li>
+     * </ul>
+     * </p>
      */
-    private Object metricLock(Stat stat) {
-        return this;
+    private Object metricLock() {
+        return metricLock;
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 6acc39d..a902de0 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -26,13 +26,16 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.List;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kafka.common.Metric;
@@ -54,9 +57,12 @@ import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("deprecation")
 public class MetricsTest {
+    private static final Logger log = LoggerFactory.getLogger(MetricsTest.class);
 
     private static final double EPS = 0.000001;
     private MockTime time = new MockTime();
@@ -604,8 +610,12 @@ public class MetricsTest {
         }
     }
 
+    /**
+     * Verifies that concurrent sensor add, remove, updates and read don't result
+     * in errors or deadlock.
+     */
     @Test
-    public void testConcurrentAccess() throws Exception {
+    public void testConcurrentReadUpdate() throws Exception {
         final Random random = new Random();
         final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
         metrics = new Metrics(new MockTime(10));
@@ -613,7 +623,83 @@ public class MetricsTest {
 
         final AtomicBoolean alive = new AtomicBoolean(true);
         executorService = Executors.newSingleThreadExecutor();
-        executorService.submit(new Runnable() {
+        executorService.submit(new ConcurrentMetricOperation(alive, "record", new Runnable()
{
+                    @Override
+                    public void run() {
+                        while (alive.get()) {
+                            for (Sensor sensor : sensors) {
+                                sensor.record(random.nextInt(10000));
+                            }
+                        }
+                    }
+                })
+        );
+
+        for (int i = 0; i < 10000; i++) {
+            if (sensors.size() > 5) {
+                Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast();
+                metrics.removeSensor(sensor.name());
+            }
+            StatType statType = StatType.forId(random.nextInt(StatType.values().length));
+            sensors.add(sensorCreator.createSensor(statType, i));
+            for (Sensor sensor : sensors) {
+                for (KafkaMetric metric : sensor.metrics()) {
+                    assertNotNull("Invalid metric value", metric.metricValue());
+                }
+            }
+        }
+        alive.set(false);
+    }
+
+    /**
+     * Verifies that concurrent sensor add, remove, updates and read with a metrics reporter
+     * that synchronizes on every reporter method doesn't result in errors or deadlock.
+     */
+    @Test
+    public void testConcurrentReadUpdateReport() throws Exception {
+
+        class LockingReporter implements MetricsReporter {
+            Map<MetricName, KafkaMetric> activeMetrics = new HashMap<>();
+            @Override
+            public synchronized void init(List<KafkaMetric> metrics) {
+            }
+
+            @Override
+            public synchronized void metricChange(KafkaMetric metric) {
+                activeMetrics.put(metric.metricName(), metric);
+            }
+
+            @Override
+            public synchronized void metricRemoval(KafkaMetric metric) {
+                activeMetrics.remove(metric.metricName());
+            }
+
+            @Override
+            public synchronized void close() {
+            }
+
+            @Override
+            public void configure(Map<String, ?> configs) {
+            }
+
+            synchronized void processMetrics() {
+                for (KafkaMetric metric : activeMetrics.values()) {
+                    assertNotNull("Invalid metric value", metric.metricValue());
+                }
+            }
+        }
+
+        final LockingReporter reporter = new LockingReporter();
+        this.metrics.close();
+        this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) reporter), new
MockTime(10), true);
+        final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
+        SensorCreator sensorCreator = new SensorCreator(metrics);
+
+        final Random random = new Random();
+        final AtomicBoolean alive = new AtomicBoolean(true);
+        executorService = Executors.newFixedThreadPool(3);
+
+        Future<?> writeFuture = executorService.submit(new ConcurrentMetricOperation(alive,
"record", new Runnable() {
             @Override
             public void run() {
                 while (alive.get()) {
@@ -622,22 +708,62 @@ public class MetricsTest {
                     }
                 }
             }
-        });
+        })
+        );
+        Future<?> readFuture = executorService.submit(new ConcurrentMetricOperation(alive,
"read", new Runnable() {
+                    @Override
+                    public void run() {
+                        while (alive.get()) {
+                            for (Sensor sensor : sensors) {
+                                for (Metric metric : sensor.metrics()) {
+                                    assertNotNull("Invalid metric value", metric.metricValue());
+                                }
+                            }
+                        }
+                    }
+                })
+        );
+        Future<?> reportFuture = executorService.submit(new ConcurrentMetricOperation(alive,
"report", new Runnable() {
+                    @Override
+                    public void run() {
+                        reporter.processMetrics();
+                    }
+                })
+        );
 
         for (int i = 0; i < 10000; i++) {
-            if (sensors.size() > 5) {
+            if (sensors.size() > 10) {
                 Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast();
                 metrics.removeSensor(sensor.name());
             }
             StatType statType = StatType.forId(random.nextInt(StatType.values().length));
             sensors.add(sensorCreator.createSensor(statType, i));
-            for (Sensor sensor : sensors) {
-                for (KafkaMetric metric : sensor.metrics()) {
-                    assertNotNull("Invalid metric value", metric.metricValue());
+        }
+        assertFalse("Read failed", readFuture.isDone());
+        assertFalse("Write failed", writeFuture.isDone());
+        assertFalse("Report failed", reportFuture.isDone());
+
+        alive.set(false);
+    }
+
+    private class ConcurrentMetricOperation implements Runnable {
+        private final AtomicBoolean alive;
+        private final String opName;
+        private final Runnable op;
+        ConcurrentMetricOperation(AtomicBoolean alive, String opName, Runnable op) {
+            this.alive = alive;
+            this.opName = opName;
+            this.op = op;
+        }
+        public void run() {
+            try {
+                while (alive.get()) {
+                    op.run();
                 }
+            } catch (Throwable t) {
+                log.error("Metric {} failed with exception", opName, t);
             }
         }
-        alive.set(false);
     }
 
     enum StatType {
@@ -676,7 +802,7 @@ public class MetricsTest {
         }
 
         private Sensor createSensor(StatType statType, int index) {
-            Sensor sensor = metrics.sensor("kafka.requests");
+            Sensor sensor = metrics.sensor("kafka.requests." + index);
             Map<String, String> tags = Collections.singletonMap("tag", "tag" + index);
             switch (statType) {
                 case AVG:


Mime
View raw message