kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove deprecated Metric.value() method usage (#5626)
Date Wed, 12 Sep 2018 18:05:28 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c121f4e  MINOR: Remove deprecated Metric.value() method usage (#5626)
c121f4e is described below

commit c121f4eb82da654acbdd133a556cfe1f9197a46a
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Wed Sep 12 23:35:19 2018 +0530

    MINOR: Remove deprecated Metric.value() method usage (#5626)
    
    Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, John Roesler <john@confluent.io>,
Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../clients/consumer/internals/FetcherTest.java    | 50 +++++++-------
 .../clients/producer/internals/SenderTest.java     |  8 +--
 .../apache/kafka/common/metrics/MetricsTest.java   | 78 +++++++++++++---------
 .../common/metrics/stats/FrequenciesTest.java      | 24 +++----
 .../apache/kafka/common/network/NioEchoServer.java |  2 +-
 core/src/main/scala/kafka/utils/ToolsUtils.scala   |  2 +-
 .../scala/integration/kafka/api/MetricsTest.scala  |  2 +-
 .../kafka/api/PlaintextConsumerTest.scala          |  6 +-
 .../other/kafka/ReplicationQuotasTestRig.scala     |  2 +-
 .../unit/kafka/server/ClientQuotaManagerTest.scala | 12 ++--
 .../kafka/server/ReplicationQuotaManagerTest.scala |  2 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  2 +-
 .../state/internals/MeteredSessionStoreTest.java   | 12 ++--
 .../kafka/tools/PushHttpMetricsReporter.java       |  2 +-
 15 files changed, 110 insertions(+), 96 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 3bf3deb..48d61b9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1576,8 +1576,8 @@ public class FetcherTest {
         KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg));
         KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax));
         // Throttle times are ApiVersions=400, Fetch=(100, 200, 300)
-        assertEquals(250, avgMetric.value(), EPSILON);
-        assertEquals(400, maxMetric.value(), EPSILON);
+        assertEquals(250, (Double) avgMetric.metricValue(), EPSILON);
+        assertEquals(400, (Double) maxMetric.metricValue(), EPSILON);
         client.close();
     }
 
@@ -1599,14 +1599,14 @@ public class FetcherTest {
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
 
         // recordsFetchLagMax should be initialized to negative infinity
-        assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
+        assertEquals(Double.NEGATIVE_INFINITY, (Double) recordsFetchLagMax.metricValue(),
EPSILON);
 
         // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse
         fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
-        assertEquals(100, recordsFetchLagMax.value(), EPSILON);
+        assertEquals(100, (Double) recordsFetchLagMax.metricValue(), EPSILON);
 
         KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
-        assertEquals(100, partitionLag.value(), EPSILON);
+        assertEquals(100, (Double) partitionLag.metricValue(), EPSILON);
 
         // recordsFetchLagMax should be hw - offset of the last message after receiving a
non-empty FetchResponse
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
@@ -1614,8 +1614,8 @@ public class FetcherTest {
         for (int v = 0; v < 3; v++)
             builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-"
+ v).getBytes());
         fetchRecords(tp0, builder.build(), Errors.NONE, 200L, 0);
-        assertEquals(197, recordsFetchLagMax.value(), EPSILON);
-        assertEquals(197, partitionLag.value(), EPSILON);
+        assertEquals(197, (Double) recordsFetchLagMax.metricValue(), EPSILON);
+        assertEquals(197, (Double) partitionLag.metricValue(), EPSILON);
 
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
@@ -1637,14 +1637,14 @@ public class FetcherTest {
         KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric);
 
         // recordsFetchLeadMin should be initialized to MAX_VALUE
-        assertEquals(Double.MAX_VALUE, recordsFetchLeadMin.value(), EPSILON);
+        assertEquals(Double.MAX_VALUE, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
 
         // recordsFetchLeadMin should be position - logStartOffset after receiving an empty
FetchResponse
         fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
-        assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
+        assertEquals(0L, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
 
         KafkaMetric partitionLead = allMetrics.get(partitionLeadMetric);
-        assertEquals(0L, partitionLead.value(), EPSILON);
+        assertEquals(0L, (Double) partitionLead.metricValue(), EPSILON);
 
         // recordsFetchLeadMin should be position - logStartOffset after receiving a non-empty
FetchResponse
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
@@ -1653,8 +1653,8 @@ public class FetcherTest {
             builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-"
+ v).getBytes());
         }
         fetchRecords(tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
-        assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
-        assertEquals(3L, partitionLead.value(), EPSILON);
+        assertEquals(0L, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
+        assertEquals(3L, (Double) partitionLead.metricValue(), EPSILON);
 
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
@@ -1681,14 +1681,14 @@ public class FetcherTest {
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
 
         // recordsFetchLagMax should be initialized to negative infinity
-        assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
+        assertEquals(Double.NEGATIVE_INFINITY, (Double) recordsFetchLagMax.metricValue(),
EPSILON);
 
         // recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse
         fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
-        assertEquals(50, recordsFetchLagMax.value(), EPSILON);
+        assertEquals(50, (Double) recordsFetchLagMax.metricValue(), EPSILON);
 
         KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
-        assertEquals(50, partitionLag.value(), EPSILON);
+        assertEquals(50, (Double) partitionLag.metricValue(), EPSILON);
 
         // recordsFetchLagMax should be lso - offset of the last message after receiving
a non-empty FetchResponse
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
@@ -1696,8 +1696,8 @@ public class FetcherTest {
         for (int v = 0; v < 3; v++)
             builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-"
+ v).getBytes());
         fetchRecords(tp0, builder.build(), Errors.NONE, 200L, 150L, 0);
-        assertEquals(147, recordsFetchLagMax.value(), EPSILON);
-        assertEquals(147, partitionLag.value(), EPSILON);
+        assertEquals(147, (Double) recordsFetchLagMax.metricValue(), EPSILON);
+        assertEquals(147, (Double) partitionLag.metricValue(), EPSILON);
 
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
@@ -1748,8 +1748,8 @@ public class FetcherTest {
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
         KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
-        assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
-        assertEquals(6, recordsCountAverage.value(), EPSILON);
+        assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), EPSILON);
+        assertEquals(6, (Double) recordsCountAverage.metricValue(), EPSILON);
     }
 
     @Test
@@ -1774,8 +1774,8 @@ public class FetcherTest {
         }
 
         fetchRecords(tp0, records, Errors.NONE, 100L, 0);
-        assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
-        assertEquals(2, recordsCountAverage.value(), EPSILON);
+        assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), EPSILON);
+        assertEquals(2, (Double) recordsCountAverage.metricValue(), EPSILON);
     }
 
     @Test
@@ -1810,8 +1810,8 @@ public class FetcherTest {
         for (Record record : records.records())
             expectedBytes += record.sizeInBytes();
 
-        assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
-        assertEquals(3, recordsCountAverage.value(), EPSILON);
+        assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), EPSILON);
+        assertEquals(3, (Double) recordsCountAverage.metricValue(), EPSILON);
     }
 
     @Test
@@ -1851,8 +1851,8 @@ public class FetcherTest {
         for (Record record : records.records())
             expectedBytes += record.sizeInBytes();
 
-        assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
-        assertEquals(3, recordsCountAverage.value(), EPSILON);
+        assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), EPSILON);
+        assertEquals(3, (Double) recordsCountAverage.metricValue(), EPSILON);
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 1a6e778..f93f343 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -294,8 +294,8 @@ public class SenderTest {
         KafkaMetric avgMetric = allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeAvg);
         KafkaMetric maxMetric = allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeMax);
         // Throttle times are ApiVersions=400, Produce=(100, 200, 300)
-        assertEquals(250, avgMetric.value(), EPS);
-        assertEquals(400, maxMetric.value(), EPS);
+        assertEquals(250, (Double) avgMetric.metricValue(), EPS);
+        assertEquals(400, (Double) maxMetric.metricValue(), EPS);
         client.close();
     }
 
@@ -1771,7 +1771,7 @@ public class SenderTest {
         assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
 
         KafkaMetric recordErrors = m.metrics().get(senderMetrics.recordErrorRate);
-        assertTrue("Expected non-zero value for record send errors", recordErrors.value()
> 0);
+        assertTrue("Expected non-zero value for record send errors", (Double) recordErrors.metricValue()
> 0);
 
         assertTrue(responseFuture.isDone());
         assertEquals(0, (long) transactionManager.sequenceNumber(tp0));
@@ -1914,7 +1914,7 @@ public class SenderTest {
             assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp));
             assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
             assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
-            assertTrue("There should be a split", m.metrics().get(senderMetrics.batchSplitRate).value()
> 0);
+            assertTrue("There should be a split", (Double) (m.metrics().get(senderMetrics.batchSplitRate).metricValue())
> 0);
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 5c75d03..eb3f775 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -103,6 +104,10 @@ public class MetricsTest {
 
     @Test
     public void testSimpleStats() throws Exception {
+        verifyStats(m -> (double) m.metricValue());
+    }
+
+    private void verifyStats(Function<KafkaMetric, Double> metricValueFunc) {
         ConstantMeasurable measurable = new ConstantMeasurable();
 
         metrics.addMetric(metrics.metricName("direct.measurable", "grp1", "The fraction of
time an appender waits for space allocation."), measurable);
@@ -132,24 +137,24 @@ public class MetricsTest {
         // prior to any time passing
         double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
         assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs),
count / elapsedSecs,
-                     metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(),
EPS);
+                     metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.occurences",
"grp1"))), EPS);
 
         // pretend 2 seconds passed...
         long sleepTimeMs = 2;
         time.sleep(sleepTimeMs * 1000);
         elapsedSecs += sleepTimeMs;
 
-        assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(metrics.metricName("s2.total",
"grp1")).value(), EPS);
-        assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(metrics.metricName("test.avg",
"grp1")).value(), EPS);
-        assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(metrics.metricName("test.max",
"grp1")).value(), EPS);
-        assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(metrics.metricName("test.min",
"grp1")).value(), EPS);
+        assertEquals("s2 reflects the constant value", 5.0, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("s2.total",
"grp1"))), EPS);
+        assertEquals("Avg(0...9) = 4.5", 4.5, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.avg",
"grp1"))), EPS);
+        assertEquals("Max(0...9) = 9", count - 1,  metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.max",
"grp1"))), EPS);
+        assertEquals("Min(0...9) = 0", 0.0, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.min",
"grp1"))), EPS);
         assertEquals("Rate(0...9) = 1.40625",
-                     sum / elapsedSecs, metrics.metrics().get(metrics.metricName("test.rate",
"grp1")).value(), EPS);
+                     sum / elapsedSecs, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.rate",
"grp1"))), EPS);
         assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs),
                      count / elapsedSecs,
-                     metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(),
EPS);
+                     metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.occurences",
"grp1"))), EPS);
         assertEquals("Count(0...9) = 10",
-                     (double) count, metrics.metrics().get(metrics.metricName("test.count",
"grp1")).value(), EPS);
+                     (double) count, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.count",
"grp1"))), EPS);
     }
 
     @Test
@@ -172,11 +177,11 @@ public class MetricsTest {
         child2.record();
         grandchild.record();
 
-        double p1 = parent1.metrics().get(0).value();
-        double p2 = parent2.metrics().get(0).value();
-        double c1 = child1.metrics().get(0).value();
-        double c2 = child2.metrics().get(0).value();
-        double gc = grandchild.metrics().get(0).value();
+        double p1 = (double) parent1.metrics().get(0).metricValue();
+        double p2 = (double) parent2.metrics().get(0).metricValue();
+        double c1 = (double) child1.metrics().get(0).metricValue();
+        double c2 = (double) child2.metrics().get(0).metricValue();
+        double gc = (double) grandchild.metrics().get(0).metricValue();
 
         /* each metric should have a count equal to one + its children's count */
         assertEquals(1.0, gc, EPS);
@@ -395,7 +400,7 @@ public class MetricsTest {
         } catch (QuotaViolationException e) {
             // this is good
         }
-        assertEquals(6.0, metrics.metrics().get(metrics.metricName("test1.total", "grp1")).value(),
EPS);
+        assertEquals(6.0, (Double) metrics.metrics().get(metrics.metricName("test1.total",
"grp1")).metricValue(), EPS);
         sensor.record(-6.0);
         try {
             sensor.record(-1.0);
@@ -438,24 +443,24 @@ public class MetricsTest {
         for (int i = 0; i < buckets; i++)
             sensor.record(i);
 
-        assertEquals(25, p25.value(), 1.0);
-        assertEquals(50, p50.value(), 1.0);
-        assertEquals(75, p75.value(), 1.0);
+        assertEquals(25, (Double) p25.metricValue(), 1.0);
+        assertEquals(50, (Double) p50.metricValue(), 1.0);
+        assertEquals(75, (Double) p75.metricValue(), 1.0);
 
         for (int i = 0; i < buckets; i++)
             sensor.record(0.0);
 
-        assertEquals(0.0, p25.value(), 1.0);
-        assertEquals(0.0, p50.value(), 1.0);
-        assertEquals(0.0, p75.value(), 1.0);
+        assertEquals(0.0, (Double) p25.metricValue(), 1.0);
+        assertEquals(0.0, (Double) p50.metricValue(), 1.0);
+        assertEquals(0.0, (Double) p75.metricValue(), 1.0);
 
         // record two more windows worth of sequential values
         for (int i = 0; i < buckets; i++)
             sensor.record(i);
 
-        assertEquals(25, p25.value(), 1.0);
-        assertEquals(50, p50.value(), 1.0);
-        assertEquals(75, p75.value(), 1.0);
+        assertEquals(25, (Double) p25.metricValue(), 1.0);
+        assertEquals(50, (Double) p50.metricValue(), 1.0);
+        assertEquals(75, (Double) p75.metricValue(), 1.0);
     }
 
     @Test
@@ -479,7 +484,7 @@ public class MetricsTest {
             s.record(100);
             sum += 100;
             time.sleep(cfg.timeWindowMs());
-            assertEquals(sum, totalMetric.value(), EPS);
+            assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
         }
 
         // Sleep for half the window.
@@ -490,19 +495,19 @@ public class MetricsTest {
 
         KafkaMetric rateMetric = metrics.metrics().get(rateMetricName);
         KafkaMetric countRateMetric = metrics.metrics().get(countRateMetricName);
-        assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, rateMetric.value(), EPS);
-        assertEquals("Count rate(0...2) = 0.02666", count / elapsedSecs, countRateMetric.value(),
EPS);
+        assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, (Double) rateMetric.metricValue(),
EPS);
+        assertEquals("Count rate(0...2) = 0.02666", count / elapsedSecs, (Double) countRateMetric.metricValue(),
EPS);
         assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
                 ((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000,
EPS);
-        assertEquals(sum, totalMetric.value(), EPS);
-        assertEquals(count, countTotalMetric.value(), EPS);
+        assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
+        assertEquals(count, (Double) countTotalMetric.metricValue(), EPS);
 
         // Verify that rates are expired, but total is cumulative
         time.sleep(cfg.timeWindowMs() * cfg.samples());
-        assertEquals(0, rateMetric.value(), EPS);
-        assertEquals(0, countRateMetric.value(), EPS);
-        assertEquals(sum, totalMetric.value(), EPS);
-        assertEquals(count, countTotalMetric.value(), EPS);
+        assertEquals(0, (Double) rateMetric.metricValue(), EPS);
+        assertEquals(0, (Double) countRateMetric.metricValue(), EPS);
+        assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
+        assertEquals(count, (Double) countTotalMetric.metricValue(), EPS);
     }
 
     public static class ConstantMeasurable implements Measurable {
@@ -829,4 +834,13 @@ public class MetricsTest {
             return sensor;
         }
     }
+
+    /**
+     * This test is to verify the deprecated {@link Metric#value()} method.
+     * @deprecated This will be removed in a future major release.
+     */
+    @Test
+    public void testDeprecatedMetricValueMethod() {
+        verifyStats(KafkaMetric::value);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index 9b6f686..e3e623f 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -124,29 +124,29 @@ public class FrequenciesTest {
         for (int i = 0; i != 100; ++i) {
             frequencies.record(config, i % 4 + 1, time.milliseconds());
         }
-        assertEquals(0.25, metric1.value(), DELTA);
-        assertEquals(0.25, metric2.value(), DELTA);
-        assertEquals(0.25, metric3.value(), DELTA);
-        assertEquals(0.25, metric4.value(), DELTA);
+        assertEquals(0.25, (Double) metric1.metricValue(), DELTA);
+        assertEquals(0.25, (Double) metric2.metricValue(), DELTA);
+        assertEquals(0.25, (Double) metric3.metricValue(), DELTA);
+        assertEquals(0.25, (Double) metric4.metricValue(), DELTA);
 
         // Record 2 windows worth of values
         for (int i = 0; i != 100; ++i) {
             frequencies.record(config, i % 2 + 1, time.milliseconds());
         }
-        assertEquals(0.50, metric1.value(), DELTA);
-        assertEquals(0.50, metric2.value(), DELTA);
-        assertEquals(0.00, metric3.value(), DELTA);
-        assertEquals(0.00, metric4.value(), DELTA);
+        assertEquals(0.50, (Double) metric1.metricValue(), DELTA);
+        assertEquals(0.50, (Double) metric2.metricValue(), DELTA);
+        assertEquals(0.00, (Double) metric3.metricValue(), DELTA);
+        assertEquals(0.00, (Double) metric4.metricValue(), DELTA);
 
         // Record 1 window worth of values to overlap with the last window
         // that is half 1.0 and half 2.0
         for (int i = 0; i != 50; ++i) {
             frequencies.record(config, 4.0, time.milliseconds());
         }
-        assertEquals(0.25, metric1.value(), DELTA);
-        assertEquals(0.25, metric2.value(), DELTA);
-        assertEquals(0.00, metric3.value(), DELTA);
-        assertEquals(0.50, metric4.value(), DELTA);
+        assertEquals(0.25, (Double) metric1.metricValue(), DELTA);
+        assertEquals(0.25, (Double) metric2.metricValue(), DELTA);
+        assertEquals(0.00, (Double) metric3.metricValue(), DELTA);
+        assertEquals(0.50, (Double) metric4.metricValue(), DELTA);
     }
 
     protected MetricName name(String metricName) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 7996e59..76d37c2 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -115,7 +115,7 @@ public class NioEchoServer extends Thread {
     public double metricValue(String name) {
         for (Map.Entry<MetricName, KafkaMetric> entry : metrics.metrics().entrySet())
{
             if (entry.getKey().name().equals(name))
-                return entry.getValue().value();
+                return (double) entry.getValue().metricValue();
         }
         throw new IllegalStateException("Metric not found, " + name + ", found=" + metrics.metrics().keySet());
     }
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 50e04f5..76a12b6 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -52,7 +52,7 @@ object ToolsUtils {
         if (maxLengthOfDisplayName < mergedKeyName.length) {
           maxLengthOfDisplayName = mergedKeyName.length
         }
-        (mergedKeyName, value.value())
+        (mergedKeyName, value.metricValue)
     }
     println(s"\n%-${maxLengthOfDisplayName}s   %s".format("Metric Name", "Value"))
     sortedMap.foreach {
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 814754d..b2f5a7e 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -259,7 +259,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
       group: Option[String]): Double = {
     // Use max value of all matching metrics since Selector metrics are recorded for each
Processor
     verifyKafkaMetric(name, metrics, entity, group) { matchingMetrics =>
-      matchingMetrics.foldLeft(0.0)((max, metric) => Math.max(max, metric.value))
+      matchingMetrics.foldLeft(0.0)((max, metric) => Math.max(max, metric.metricValue.asInstanceOf[Double]))
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 983b63c..1031483 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1493,7 +1493,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics",
"", tags1))
     assertNotNull(fetchLag0)
     val expectedLag = numMessages - records.count
-    assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon)
+    assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.metricValue.asInstanceOf[Double],
epsilon)
 
     // Remove topic from subscription
     consumer.subscribe(List(topic2).asJava, listener0)
@@ -1566,7 +1566,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertNotNull(fetchLag)
 
     val expectedLag = numMessages - records.count
-    assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
+    assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.metricValue.asInstanceOf[Double],
epsilon)
 
     consumer.assign(List(tp2).asJava)
     TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume
any message before timeout.")
@@ -1624,7 +1624,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     tags.put("partition", String.valueOf(tp.partition()))
     val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics",
"", tags))
 
-    assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count,
lag.value, epsilon)
+    assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count,
lag.metricValue.asInstanceOf[Double], epsilon)
   }
 
   @Test
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index b2568c1..470ee4d 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -265,7 +265,7 @@ object ReplicationQuotasTestRig {
     private def measuredRate(broker: KafkaServer, repType: QuotaType): Double = {
       val metricName = broker.metrics.metricName("byte-rate", repType.toString)
       if (broker.metrics.metrics.asScala.contains(metricName))
-        broker.metrics.metrics.asScala(metricName).value
+        broker.metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
       else -1
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index c5275c2..6f75174 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -276,7 +276,7 @@ class ClientQuotaManagerTest {
         assertEquals(0, maybeRecord(clientMetrics, "ANONYMOUS", "unknown", 400))
         time.sleep(1000)
       }
-      assertEquals(0, queueSizeMetric.value().toInt)
+      assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
 
       // Create a spike.
       // 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second.
@@ -287,7 +287,7 @@ class ClientQuotaManagerTest {
 
       assertEquals("Should be throttled", 2100, sleepTime)
       throttle(clientMetrics, "ANONYMOYUS", "unknown", sleepTime, callback)
-      assertEquals(1, queueSizeMetric.value().toInt)
+      assertEquals(1, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
       // After a request is delayed, the callback cannot be triggered immediately
       clientMetrics.throttledChannelReaper.doWork()
       assertEquals(0, numCallbacks)
@@ -295,7 +295,7 @@ class ClientQuotaManagerTest {
 
       // Callback can only be triggered after the delay time passes
       clientMetrics.throttledChannelReaper.doWork()
-      assertEquals(0, queueSizeMetric.value().toInt)
+      assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
       assertEquals(1, numCallbacks)
 
       // Could continue to see delays until the bursty sample disappears
@@ -326,7 +326,7 @@ class ClientQuotaManagerTest {
         assertEquals(0, maybeRecord(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4)))
         time.sleep(1000)
       }
-      assertEquals(0, queueSizeMetric.value().toInt)
+      assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
 
       // Create a spike.
       // quota = 1% (10ms per second)
@@ -339,7 +339,7 @@ class ClientQuotaManagerTest {
       assertEquals("Should be throttled", 210, throttleTime)
 
       throttle(quotaManager, "ANONYMOYUS", "test-client", throttleTime, callback)
-      assertEquals(1, queueSizeMetric.value().toInt)
+      assertEquals(1, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
       // After a request is delayed, the callback cannot be triggered immediately
       quotaManager.throttledChannelReaper.doWork()
       assertEquals(0, numCallbacks)
@@ -347,7 +347,7 @@ class ClientQuotaManagerTest {
 
       // Callback can only be triggered after the delay time passes
       quotaManager.throttledChannelReaper.doWork()
-      assertEquals(0, queueSizeMetric.value().toInt)
+      assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
       assertEquals(1, numCallbacks)
 
       // Could continue to see delays until the bursty sample disappears
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index b1edd01..dbb52e7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -99,7 +99,7 @@ class ReplicationQuotaManagerTest {
 
   def rate(metrics: Metrics): Double = {
     val metricName = metrics.metricName("byte-rate", LeaderReplication.toString, "Tracking
byte-rate for " + LeaderReplication)
-    val leaderThrottledRate = metrics.metrics.asScala(metricName).value()
+    val leaderThrottledRate = metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
     leaderThrottledRate
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 5125486..78300c9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -236,6 +236,6 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
   private def measuredRate(broker: KafkaServer, repType: QuotaType): Double = {
     val metricName = broker.metrics.metricName("byte-rate", repType.toString)
-    broker.metrics.metrics.asScala(metricName).value
+    broker.metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7df1bd6..7032724 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -195,7 +195,7 @@ class RequestQuotaTest extends BaseRequestTest {
 
   private def metricValue(metric: KafkaMetric, sensor: Sensor): Double = {
     sensor.synchronized {
-      if (metric == null) -1.0 else metric.value
+      if (metric == null) -1.0 else metric.metricValue.asInstanceOf[Double]
     }
   }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 3bd190a..d9e6964 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -116,7 +116,7 @@ public class MeteredSessionStoreTest {
         iterator.close();
 
         final KafkaMetric metric = metric("fetch-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -133,7 +133,7 @@ public class MeteredSessionStoreTest {
         iterator.close();
 
         final KafkaMetric metric = metric("fetch-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -147,7 +147,7 @@ public class MeteredSessionStoreTest {
         metered.remove(new Windowed<>(key, new SessionWindow(0, 0)));
 
         final KafkaMetric metric = metric("remove-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -164,7 +164,7 @@ public class MeteredSessionStoreTest {
         iterator.close();
 
         final KafkaMetric metric = metric("fetch-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -181,7 +181,7 @@ public class MeteredSessionStoreTest {
         iterator.close();
 
         final KafkaMetric metric = metric("fetch-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -189,7 +189,7 @@ public class MeteredSessionStoreTest {
     public void shouldRecordRestoreTimeOnInit() {
         init();
         final KafkaMetric metric = metric("restore-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
index c5764b4..6adebf5 100644
--- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -174,7 +174,7 @@ public class PushHttpMetricsReporter implements MetricsReporter {
                 samples = new ArrayList<>(metrics.size());
                 for (KafkaMetric metric : metrics.values()) {
                     MetricName name = metric.metricName();
-                    double value = metric.value();
+                    double value = (Double) metric.metricValue();
                     samples.add(new MetricValue(name.name(), name.group(), name.tags(), value));
                 }
             }


Mime
View raw message