kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7734: Metrics tags should use LinkedHashMap to guarantee ordering (#6032)
Date Fri, 04 Jan 2019 01:15:30 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 0af021f  KAFKA-7734: Metrics tags should use LinkedHashMap to guarantee ordering
(#6032)
0af021f is described below

commit 0af021f10248e34b84a43b88c6549bee9d9cbe87
Author: lambdaliu <38787005+lambdaliu@users.noreply.github.com>
AuthorDate: Fri Jan 4 09:14:26 2019 +0800

    KAFKA-7734: Metrics tags should use LinkedHashMap to guarantee ordering (#6032)
    
    This pull request replaces HashMap with LinkedHashMap to guarantee ordering of metrics
tags.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <guozhang@confluent.io>,
John Roesler <vvcephei@users.noreply.github.com>
---
 .../org/apache/kafka/common/metrics/JmxReporter.java     |  2 +-
 .../java/org/apache/kafka/common/metrics/Metrics.java    |  2 +-
 .../processor/internals/metrics/StreamsMetricsImpl.java  |  5 +++--
 .../apache/kafka/streams/state/internals/NamedCache.java |  8 ++++----
 .../streams/processor/internals/ProcessorNodeTest.java   |  7 ++++++-
 .../streams/processor/internals/StreamTaskTest.java      |  6 ++++++
 .../streams/processor/internals/StreamThreadTest.java    |  6 ++++++
 .../state/internals/MeteredKeyValueStoreTest.java        | 12 ++++++++++++
 .../streams/state/internals/MeteredSessionStoreTest.java | 12 ++++++++++++
 .../streams/state/internals/MeteredWindowStoreTest.java  | 16 +++++++++++++++-
 .../kafka/streams/state/internals/NamedCacheTest.java    | 13 ++++++++++++-
 11 files changed, 78 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 063fb3b..8d7bdbe 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -75,7 +75,7 @@ public class JmxReporter implements MetricsReporter {
         }
     }
 
-    boolean containsMbean(String mbeanName) {
+    public boolean containsMbean(String mbeanName) {
         return mbeans.containsKey(mbeanName);
     }
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 9e2b6f1..91c245d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -227,7 +227,7 @@ public class Metrics implements Closeable {
     private static Map<String, String> getTags(String... keyValue) {
         if ((keyValue.length % 2) != 0)
             throw new IllegalArgumentException("keyValue needs to be specified in pairs");
-        Map<String, String> tags = new HashMap<String, String>();
+        Map<String, String> tags = new LinkedHashMap<String, String>();
 
         for (int i = 0; i < keyValue.length; i += 2)
             tags.put(keyValue[i], keyValue[i + 1]);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index dd6cc4a..60d378a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Objects;
@@ -261,7 +262,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
     public final Map<String, String> tagMap(final String... tags) {
-        final Map<String, String> tagMap = new HashMap<>();
+        final Map<String, String> tagMap = new LinkedHashMap<>();
+        tagMap.put("client-id", threadName);
         if (tags != null) {
             if ((tags.length % 2) != 0) {
                 throw new IllegalArgumentException("Tags needs to be specified in key-value
pairs");
@@ -270,7 +272,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
             for (int i = 0; i < tags.length; i += 2)
                 tagMap.put(tags[i], tags[i + 1]);
         }
-        tagMap.put("client-id", threadName);
         return tagMap;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 12b4cf3..3ce7cbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -367,8 +367,8 @@ class NamedCache {
 
             // add parent
             final Map<String, String> allMetricTags = metrics.tagMap(
-                "record-cache-id", "all",
-                "task-id", taskName
+                 "task-id", taskName,
+                "record-cache-id", "all"
             );
             final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor(taskName, "hitRatio",
Sensor.RecordingLevel.DEBUG);
             taskLevelHitRatioSensor.add(
@@ -386,8 +386,8 @@ class NamedCache {
 
             // add child
             final Map<String, String> metricTags = metrics.tagMap(
-                "record-cache-id", ThreadCache.underlyingStoreNamefromCacheName(cacheName),
-                "task-id", taskName
+                 "task-id", taskName,
+                "record-cache-id", ThreadCache.underlyingStoreNamefromCacheName(cacheName)
             );
 
             hitRatioSensor = metrics.cacheLevelSensor(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 7c44258..c3d8583 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
@@ -33,6 +34,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class ProcessorNodeTest {
 
@@ -133,7 +135,10 @@ public class ProcessorNodeTest {
                                                                "The average number of occurrence
of " + throughputOperation + " operation per second.",
                                                                metricTags)));
 
-
+        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=mock,task-id=%s,processor-node-id=%s",
+                groupName, context.taskId().toString(), node.name())));
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 7ff7c70..74addbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -245,6 +246,11 @@ public class StreamTaskTest {
         assertNotNull(getMetric("%s-latency-avg", "The average latency of %s operation.",
"all"));
         assertNotNull(getMetric("%s-latency-max", "The max latency of %s operation.", "all"));
         assertNotNull(getMetric("%s-rate", "The average number of occurrence of %s operation
per second.", "all"));
+
+        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,client-id=test,task-id=%s",
task.id.toString())));
+        assertTrue(reporter.containsMbean("kafka.streams:type=stream-task-metrics,client-id=test,task-id=all"));
     }
 
     private KafkaMetric getMetric(final String nameFormat, final String descriptionFormat,
final String taskId) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index b4de5ec..e9d413f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -281,6 +282,11 @@ public class StreamThreadTest {
         assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-total", defaultGroupName,
"The total number of closed tasks", defaultTags)));
         assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-rate", defaultGroupName,
"The average per-second number of skipped records.", defaultTags)));
         assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-total", defaultGroupName,
"The total number of skipped records.", defaultTags)));
+
+        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s",
+                defaultGroupName, thread.getName())));
     }
 
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index cd94c8c..0dca634 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -92,6 +93,17 @@ public class MeteredKeyValueStoreTest {
     }
 
     @Test
+    public void testMetrics() {
+        init();
+        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s",
+                "scope", "test", taskId.toString(), "scope", "metered")));
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s",
+                "scope", "test", taskId.toString(), "scope", "all")));
+    }
+
+    @Test
     public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
         inner.put(EasyMock.eq(keyBytes), EasyMock.aryEq(valueBytes));
         EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index d9e6964..92056fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -91,6 +92,17 @@ public class MeteredSessionStoreTest {
     }
 
     @Test
+    public void testMetrics() {
+        init();
+        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s",
+                "scope", "test", taskId.toString(), "scope", "metered")));
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s",
+                "scope", "test", taskId.toString(), "scope", "all")));
+    }
+
+    @Test
     public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
         inner.put(EasyMock.eq(windowedKeyBytes), EasyMock.aryEq(keyBytes));
         EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 3a6a3b4..97a835e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -45,6 +46,7 @@ import static java.util.Collections.singletonMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class MeteredWindowStoreTest {
     private InternalMockProcessorContext context;
@@ -57,6 +59,7 @@ public class MeteredWindowStoreTest {
         Serdes.String(),
         new SerdeThatDoesntHandleNull()
     );
+    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
 
     {
         EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes();
@@ -64,7 +67,6 @@ public class MeteredWindowStoreTest {
 
     @Before
     public void setUp() {
-        final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
 
         context = new InternalMockProcessorContext(
@@ -84,6 +86,18 @@ public class MeteredWindowStoreTest {
     }
 
     @Test
+    public void testMetrics() {
+        EasyMock.replay(innerStoreMock);
+        store.init(context, store);
+        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s",
+                "scope", "test", context.taskId().toString(), "scope", "mocked-store")));
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s",
+                "scope", "test", context.taskId().toString(), "scope", "all")));
+    }
+
+    @Test
     public void shouldRecordRestoreLatencyOnInit() {
         innerStoreMock.init(context, store);
         EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 71a6ac2..394feed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -43,18 +44,21 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 public class NamedCacheTest {
 
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key",
"value".getBytes())});
     private NamedCache cache;
+    private Metrics innerMetrics;
     private StreamsMetricsImpl metrics;
     private final String taskIDString = "0.0";
     private final String underlyingStoreName = "storeName";
 
     @Before
     public void setUp() {
-        metrics = new MockStreamsMetrics(new Metrics());
+        innerMetrics = new Metrics();
+        metrics = new MockStreamsMetrics(innerMetrics);
         cache = new NamedCache(taskIDString + "-" + underlyingStoreName, metrics);
     }
 
@@ -97,6 +101,13 @@ public class NamedCacheTest {
         getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics",
metricTags);
         getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics",
metricTags);
         getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics",
metricTags);
+
+        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        innerMetrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-record-cache-metrics,client-id=test,task-id=%s,record-cache-id=%s",
+                taskIDString, underlyingStoreName)));
+        assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-record-cache-metrics,client-id=test,task-id=%s,record-cache-id=%s",
+                taskIDString, "all")));
     }
 
     @Test


Mime
View raw message