kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] 01/02: KAFKA-6765: Handle exception while reading throttle metric value in test (#4869)
Date Thu, 10 May 2018 12:13:20 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f82cba6f196897f9540bb81a86823042d90c0a1e
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Apr 17 14:46:28 2018 +0100

    KAFKA-6765: Handle exception while reading throttle metric value in test (#4869)
    
    Quota tests wait for throttle metric to be updated without waiting for requests to complete
to avoid waiting for potentially large throttle times. This requires the test to read metric
values while a broker may be updating the value, resulting in exception in the test. Since
this issue can also occur with JMX metrics reporter, change synchronization on metrics with
sensors to use the sensor as lock.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 .../org/apache/kafka/common/metrics/Metrics.java   |   6 +-
 .../org/apache/kafka/common/metrics/Sensor.java    |  12 +-
 .../apache/kafka/common/metrics/MetricsTest.java   | 135 ++++++++++++++++++++-
 3 files changed, 146 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 8868ee7..e83085e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -487,7 +487,8 @@ public class Metrics implements Closeable {
 
     /**
      * Add a metric to monitor an object that implements MetricValueProvider. This metric
won't be associated with any
-     * sensor. This is a way to expose existing values as metrics.
+     * sensor. This is a way to expose existing values as metrics. User is expected to add
any additional
+     * synchronization to update and access metric values, if required.
      *
      * @param metricName The name of the metric
      * @param metricValueProvider The metric value provider associated with this metric
@@ -503,7 +504,8 @@ public class Metrics implements Closeable {
 
     /**
      * Add a metric to monitor an object that implements MetricValueProvider. This metric
won't be associated with any
-     * sensor. This is a way to expose existing values as metrics.
+     * sensor. This is a way to expose existing values as metrics. User is expected to add
any additional
+     * synchronization to update and access metric values, if required.
      *
      * @param metricName The name of the metric
      * @param metricValueProvider The metric value provider associated with this metric
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 321fab6..e95dbf7 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -224,7 +224,7 @@ public final class Sensor {
      */
     public synchronized void add(CompoundStat stat, MetricConfig config) {
         this.stats.add(Utils.notNull(stat));
-        Object lock = new Object();
+        Object lock = metricLock(stat);
         for (NamedMeasurable m : stat.stats()) {
             KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null
? this.config : config, time);
             this.registry.registerMetric(metric);
@@ -248,7 +248,7 @@ public final class Sensor {
      * @param config A special configuration for this metric. If null use the sensor default
configuration.
      */
     public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig
config) {
-        KafkaMetric metric = new KafkaMetric(new Object(),
+        KafkaMetric metric = new KafkaMetric(metricLock(stat),
                                              Utils.notNull(metricName),
                                              Utils.notNull(stat),
                                              config == null ? this.config : config,
@@ -269,4 +269,12 @@ public final class Sensor {
     synchronized List<KafkaMetric> metrics() {
         return Collections.unmodifiableList(this.metrics);
     }
+
+    /**
+     * KafkaMetrics of sensors which use SampledStat should be synchronized on the Sensor
object
+     * to allow concurrent reads and updates. For simplicity, all sensors are synchronized
on Sensor.
+     */
+    private Object metricLock(Stat stat) {
+        return this;
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 55f8e23..6acc39d 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -24,9 +24,16 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -39,8 +46,10 @@ import org.apache.kafka.common.metrics.stats.Percentile;
 import org.apache.kafka.common.metrics.stats.Percentiles;
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
 import org.junit.Before;
@@ -53,6 +62,7 @@ public class MetricsTest {
     private MockTime time = new MockTime();
     private MetricConfig config = new MetricConfig();
     private Metrics metrics;
+    private ExecutorService executorService;
 
     @Before
     public void setup() {
@@ -60,7 +70,11 @@ public class MetricsTest {
     }
 
     @After
-    public void tearDown() {
+    public void tearDown() throws Exception {
+        if (executorService != null) {
+            executorService.shutdownNow();
+            executorService.awaitTermination(5, TimeUnit.SECONDS);
+        }
         this.metrics.close();
     }
 
@@ -588,9 +602,124 @@ public class MetricsTest {
                 // this is expected
             }
         }
+    }
+
+    @Test
+    public void testConcurrentAccess() throws Exception {
+        final Random random = new Random();
+        final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
+        metrics = new Metrics(new MockTime(10));
+        SensorCreator sensorCreator = new SensorCreator(metrics);
+
+        final AtomicBoolean alive = new AtomicBoolean(true);
+        executorService = Executors.newSingleThreadExecutor();
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                while (alive.get()) {
+                    for (Sensor sensor : sensors) {
+                        sensor.record(random.nextInt(10000));
+                    }
+                }
+            }
+        });
 
+        for (int i = 0; i < 10000; i++) {
+            if (sensors.size() > 5) {
+                Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast();
+                metrics.removeSensor(sensor.name());
+            }
+            StatType statType = StatType.forId(random.nextInt(StatType.values().length));
+            sensors.add(sensorCreator.createSensor(statType, i));
+            for (Sensor sensor : sensors) {
+                for (KafkaMetric metric : sensor.metrics()) {
+                    assertNotNull("Invalid metric value", metric.metricValue());
+                }
+            }
+        }
+        alive.set(false);
     }
 
-    
+    enum StatType {
+        AVG(0),
+        TOTAL(1),
+        COUNT(2),
+        MAX(3),
+        MIN(4),
+        RATE(5),
+        SIMPLE_RATE(6),
+        SUM(7),
+        VALUE(8),
+        PERCENTILES(9),
+        METER(10);
+
+        int id;
+        StatType(int id) {
+            this.id = id;
+        }
 
+        static StatType forId(int id) {
+            for (StatType statType : StatType.values()) {
+                if (statType.id == id)
+                    return statType;
+            }
+            return null;
+        }
+    }
+
+    private static class SensorCreator {
+
+        private final Metrics metrics;
+
+        SensorCreator(Metrics metrics) {
+            this.metrics = metrics;
+        }
+
+        private Sensor createSensor(StatType statType, int index) {
+            Sensor sensor = metrics.sensor("kafka.requests");
+            Map<String, String> tags = Collections.singletonMap("tag", "tag" + index);
+            switch (statType) {
+                case AVG:
+                    sensor.add(metrics.metricName("test.metric.avg", "avg", tags), new Avg());
+                    break;
+                case TOTAL:
+                    sensor.add(metrics.metricName("test.metric.total", "total", tags), new
Total());
+                    break;
+                case COUNT:
+                    sensor.add(metrics.metricName("test.metric.count", "count", tags), new
Count());
+                    break;
+                case MAX:
+                    sensor.add(metrics.metricName("test.metric.max", "max", tags), new Max());
+                    break;
+                case MIN:
+                    sensor.add(metrics.metricName("test.metric.min", "min", tags), new Min());
+                    break;
+                case RATE:
+                    sensor.add(metrics.metricName("test.metric.rate", "rate", tags), new
Rate());
+                    break;
+                case SIMPLE_RATE:
+                    sensor.add(metrics.metricName("test.metric.simpleRate", "simpleRate",
tags), new SimpleRate());
+                    break;
+                case SUM:
+                    sensor.add(metrics.metricName("test.metric.sum", "sum", tags), new Sum());
+                    break;
+                case VALUE:
+                    sensor.add(metrics.metricName("test.metric.value", "value", tags), new
Value());
+                    break;
+                case PERCENTILES:
+                    sensor.add(metrics.metricName("test.metric.percentiles", "percentiles",
tags),
+                               new Percentiles(100, -100, 100, Percentiles.BucketSizing.CONSTANT,
+                                               new Percentile(metrics.metricName("test.median",
"percentiles"), 50.0),
+                                               new Percentile(metrics.metricName("test.perc99_9",
"percentiles"), 99.9)));
+                    break;
+                case METER:
+                    sensor.add(new Meter(metrics.metricName("test.metric.meter.rate", "meter",
tags),
+                               metrics.metricName("test.metric.meter.total", "meter", tags)));
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid stat type " + statType);
+            }
+            return sensor;
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message