kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter (#8691)
Date Thu, 28 May 2020 01:19:15 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9c833f6  KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter (#8691)
9c833f6 is described below

commit 9c833f665f349e5c292228f75188f5521282835d
Author: xiaodongdu <xiaodongdu@hotmail.com>
AuthorDate: Wed May 27 18:18:36 2020 -0700

    KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter (#8691)
    
    Implemented KIP-606 to add metadata context to MetricsReporter.
    
    Author: Xiaodong Du <xdu@confluent.io>
    Reviewers: David Arthur <mumrah@gmail.com>, Randall Hauch <rhauch@gmail.com>, Xavier Léauté <xavier@confluent.io>, Ryan Pridgeon <ryan.n.pridgeon@gmail.com>
---
 checkstyle/suppressions.xml                        |   4 +-
 .../apache/kafka/clients/CommonClientConfigs.java  |   2 +
 .../kafka/clients/admin/KafkaAdminClient.java      |   9 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   8 +-
 .../kafka/clients/producer/KafkaProducer.java      |   9 +-
 .../apache/kafka/common/metrics/JmxReporter.java   |  19 ++-
 .../kafka/common/metrics/KafkaMetricsContext.java  |  56 +++++++++
 .../org/apache/kafka/common/metrics/Metrics.java   |  30 ++++-
 .../kafka/common/metrics/MetricsContext.java       |  49 ++++++++
 .../kafka/common/metrics/MetricsReporter.java      |   9 ++
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  22 ++++
 .../kafka/clients/producer/KafkaProducerTest.java  |  23 ++++
 .../kafka/common/metrics/JmxReporterTest.java      |  20 +++
 .../connect/mirror/MirrorConnectorConfig.java      |  10 +-
 .../kafka/connect/runtime/ConnectMetrics.java      |  21 +++-
 .../org/apache/kafka/connect/runtime/Worker.java   |  32 +++--
 .../apache/kafka/connect/runtime/WorkerConfig.java |   3 +
 .../runtime/distributed/WorkerGroupMember.java     |  21 +++-
 .../connect/storage/KafkaConfigBackingStore.java   |   7 +-
 .../connect/storage/KafkaOffsetBackingStore.java   |   6 +-
 .../connect/storage/KafkaStatusBackingStore.java   |   5 +
 .../apache/kafka/connect/util/ConnectUtils.java    |  14 +++
 .../kafka/connect/runtime/ConnectMetricsTest.java  |   2 +-
 .../kafka/connect/runtime/MockConnectMetrics.java  |  14 ++-
 .../apache/kafka/connect/runtime/WorkerTest.java   | 135 +++++++++++++++++++--
 .../runtime/WorkerWithTopicCreationTest.java       |  47 +++++--
 .../runtime/distributed/WorkerGroupMemberTest.java | 103 ++++++++++++++++
 .../errors/RetryWithToleranceOperatorTest.java     |   2 +-
 .../storage/KafkaConfigBackingStoreTest.java       |   9 +-
 .../storage/KafkaOffsetBackingStoreTest.java       |  14 ++-
 .../kafka/connect/util/ConnectUtilsTest.java       |  39 ++++++
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   1 +
 core/src/main/scala/kafka/server/KafkaServer.scala |  37 ++++--
 .../group/GroupMetadataManagerTest.scala           |   6 +-
 .../transaction/TransactionStateManagerTest.scala  |   6 +-
 .../kafka/server/KafkaMetricsReporterTest.scala    |  95 +++++++++++++++
 .../org/apache/kafka/streams/KafkaStreams.java     |   9 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   4 +-
 .../processor/internals/StreamTaskTest.java        |   7 +-
 .../processor/internals/StreamThreadTest.java      |   7 +-
 .../state/internals/MeteredKeyValueStoreTest.java  |   7 +-
 .../state/internals/MeteredSessionStoreTest.java   |   7 +-
 .../MeteredTimestampedKeyValueStoreTest.java       |   7 +-
 .../state/internals/MeteredWindowStoreTest.java    |   7 +-
 44 files changed, 870 insertions(+), 74 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d55743a..d13cd88 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -100,10 +100,10 @@
     <!-- Connect -->
     <suppress checks="ClassFanOutComplexity"
               files="(DistributedHerder|Worker).java"/>
-
+    <suppress checks="ClassFanOutComplexity"
+              files="Worker(|Test).java"/>
     <suppress checks="MethodLength"
               files="(KafkaConfigBackingStore|Values).java"/>
-
     <suppress checks="ParameterNumber"
               files="Worker(SinkTask|SourceTask|Coordinator).java"/>
     <suppress checks="ParameterNumber"
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index d18c0ed..987389a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -87,6 +87,8 @@ public class CommonClientConfigs {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
     public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
 
+    public static final String METRICS_CONTEXT_PREFIX = "metrics.context.";
+
     public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
     public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " +
         Utils.join(SecurityProtocol.names(), ", ") + ".";
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e0e6cb6..1fe0841 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -124,8 +125,10 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteReque
 import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection;
 import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -462,10 +465,12 @@ public class KafkaAdminClient extends AdminClient {
                 .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                 .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                 .tags(metricTags);
-            JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+            JmxReporter jmxReporter = new JmxReporter();
             jmxReporter.configure(config.originals());
             reporters.add(jmxReporter);
-            metrics = new Metrics(metricConfig, reporters, time);
+            MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
+                    config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+            metrics = new Metrics(metricConfig, reporters, time, metricsContext);
             String metricGrpPrefix = "admin-client";
             channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
             selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 81d286f..7535a9b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -44,8 +44,10 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -868,10 +870,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 .tags(metricsTags);
         List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
-        JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+        JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(config.originals());
         reporters.add(jmxReporter);
-        return new Metrics(metricConfig, reporters, time);
+        MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
+                config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+        return new Metrics(metricConfig, reporters, time, metricsContext);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index deecfd8..375ada5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -54,8 +55,10 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -351,10 +354,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                     MetricsReporter.class,
                     Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
-            JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+            JmxReporter jmxReporter = new JmxReporter();
             jmxReporter.configure(userProvidedConfigs);
             reporters.add(jmxReporter);
-            this.metrics = new Metrics(metricConfig, reporters, time);
+            MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
+                    config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+            this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index c7be865..090f93b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -38,6 +38,7 @@ import java.lang.management.ManagementFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
@@ -71,9 +72,13 @@ public class JmxReporter implements MetricsReporter {
 
     /**
      * Create a JMX reporter that prefixes all metrics with the given string.
+     *  @deprecated Since 2.6.0. Use {@link JmxReporter#JmxReporter()}
+     *  Initialize JmxReporter with {@link JmxReporter#contextChange(MetricsContext)}
+     *  Populate prefix by adding _namespace/prefix key value pair to {@link MetricsContext}
      */
+    @Deprecated
     public JmxReporter(String prefix) {
-        this.prefix = prefix;
+        this.prefix = prefix != null ? prefix : "";
     }
 
     @Override
@@ -318,4 +323,16 @@ public class JmxReporter implements MetricsReporter {
                                       + ".(whitelist/blacklist) is not a valid regular expression");
         }
     }
+
+    @Override
+    public void contextChange(MetricsContext metricsContext) {
+        String namespace = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
+        Objects.requireNonNull(namespace);
+        synchronized (LOCK) {
+            if (!mbeans.isEmpty()) {
+                throw new IllegalStateException("JMX MetricsContext can only be updated before JMX metrics are created");
+            }
+            prefix = namespace;
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
new file mode 100644
index 0000000..2fe73de
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's contextLabels map.
+     */
+    private final Map<String, String> contextLabels = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param contextLabels  contextLabels additional entries to add to the context.
+     *                  values will be converted to string using Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) {
+        this.contextLabels.put(MetricsContext.NAMESPACE, namespace);
+        contextLabels.forEach((key, value) -> this.contextLabels.put(key, value.toString()));
+    }
+
+    @Override
+    public Map<String, String> contextLabels() {
+        return Collections.unmodifiableMap(contextLabels);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index dee970c..fcaa3b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -122,6 +122,18 @@ public class Metrics implements Closeable {
     }
 
     /**
+     * Create a metrics repository with a default config, metric reporters and metric context
+     * Expiration of Sensors is disabled.
+     * @param defaultConfig The default config
+     * @param reporters The metrics reporters
+     * @param time The time instance to use with the metrics
+     * @param metricsContext The metricsContext to initialize metrics reporter with
+     */
+    public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, MetricsContext metricsContext) {
+        this(defaultConfig, reporters, time, false, metricsContext);
+    }
+
+    /**
      * Create a metrics repository with a default config, given metric reporters and the ability to expire eligible sensors
      * @param defaultConfig The default config
      * @param reporters The metrics reporters
@@ -129,14 +141,30 @@ public class Metrics implements Closeable {
      * @param enableExpiration true if the metrics instance can garbage collect inactive sensors, false otherwise
      */
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration) {
+        this(defaultConfig, reporters, time, enableExpiration, new KafkaMetricsContext(""));
+    }
+
+    /**
+     * Create a metrics repository with a default config, given metric reporters, the ability to expire eligible sensors
+     * and MetricContext
+     * @param defaultConfig The default config
+     * @param reporters The metrics reporters
+     * @param time The time instance to use with the metrics
+     * @param enableExpiration true if the metrics instance can garbage collect inactive sensors, false otherwise
+     * @param metricsContext The metricsContext to initialize metrics reporter with
+     */
+    public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration,
+                   MetricsContext metricsContext) {
         this.config = defaultConfig;
         this.sensors = new ConcurrentHashMap<>();
         this.metrics = new ConcurrentHashMap<>();
         this.childrenSensors = new ConcurrentHashMap<>();
         this.reporters = Objects.requireNonNull(reporters);
         this.time = time;
-        for (MetricsReporter reporter : reporters)
+        for (MetricsReporter reporter : reporters) {
+            reporter.contextChange(metricsContext);
             reporter.init(new ArrayList<>());
+        }
 
         // Create the ThreadPoolExecutor only if expiration of Sensors is enabled.
         if (enableExpiration) {
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
new file mode 100644
index 0000000..dcac5c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional contextLabels about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The contextLabels map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns the labels for this metrics context.
+     *
+     * @return the map of label keys and values; never null but possibly empty
+     */
+    Map<String, String> contextLabels();
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index cc112d1..75771fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigException;
 
 /**
@@ -65,4 +66,12 @@ public interface MetricsReporter extends Reconfigurable, AutoCloseable {
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Sets the context labels for the service or library exposing metrics. This will be called before {@link #init(List)} and may be called anytime after that.
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 56f6975..022c688 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -85,9 +85,14 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.common.metrics.stats.Avg;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -383,6 +388,23 @@ public class KafkaConsumerTest {
         consumer.close();
     }
 
+    @Test
+    public void testConsumerJmxPrefix() throws  Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+        config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+        config.put("client.id", "client-1");
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        MetricName testMetricName = consumer.metrics.metricName("test-metric",
+                "grp1", "test metric");
+        consumer.metrics.addMetric(testMetricName, new Avg());
+        Assert.assertNotNull(server.getObjectInstance(new ObjectName("kafka.consumer:type=grp1,client-id=client-1")));
+        consumer.close();
+    }
+
     private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) {
         return newConsumer(groupId, Optional.empty());
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 280faa1..3f98e4b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -20,12 +20,14 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.ProducerMetadata;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -40,6 +42,7 @@ import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -66,6 +69,9 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1189,6 +1195,23 @@ public class KafkaProducerTest {
         assertionDoneLatch.await(5000, TimeUnit.MILLISECONDS);
     }
 
+    @Test
+    public void testProducerJmxPrefix() throws  Exception {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.put("client.id", "client-1");
+
+        KafkaProducer<String, String> producer = new KafkaProducer<>(
+                props, new StringSerializer(), new StringSerializer());
+
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        MetricName testMetricName = producer.metrics.metricName("test-metric",
+                "grp1", "test metric");
+        producer.metrics.addMetric(testMetricName, new Avg());
+        Assert.assertNotNull(server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
+        producer.close();
+    }
+
     private ProducerMetadata newMetadata(long refreshBackoffMs, long expirationMs) {
         return new ProducerMetadata(refreshBackoffMs, expirationMs, defaultMetadataIdleMs,
                 new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index ea9597b..5bc831c 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -19,11 +19,14 @@ package org.apache.kafka.common.metrics;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -155,4 +158,21 @@ public class JmxReporterTest {
             metrics.close();
         }
     }
+
+    @Test
+    public void testJmxPrefix() throws Exception {
+        JmxReporter reporter = new JmxReporter();
+        MetricsContext metricsContext = new KafkaMetricsContext("kafka.server");
+        MetricConfig metricConfig = new MetricConfig();
+        Metrics metrics = new Metrics(metricConfig, new ArrayList<>(Arrays.asList(reporter)), Time.SYSTEM, metricsContext);
+
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        try {
+            Sensor sensor = metrics.sensor("kafka.requests");
+            sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
+            assertEquals("kafka.server", server.getObjectInstance(new ObjectName("kafka.server:type=grp1")).getObjectName().getDomain());
+        } finally {
+            metrics.close();
+        }
+    }
 }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 72dd435..635eedf 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -19,8 +19,10 @@ package org.apache.kafka.connect.mirror;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 
@@ -270,9 +272,15 @@ public class MirrorConnectorConfig extends AbstractConfig {
     List<MetricsReporter> metricsReporters() {
         List<MetricsReporter> reporters = getConfiguredInstances(
                 CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
-        JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+        JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(this.originals());
         reporters.add(jmxReporter);
+        MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");
+
+        for (MetricsReporter reporter : reporters) {
+            reporter.contextChange(metricsContext);
+        }
+
         return reporters;
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 57b5595..a3f44f9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -21,16 +21,20 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -62,8 +66,9 @@ public class ConnectMetrics {
      * @param workerId the worker identifier; may not be null
      * @param config   the worker configuration; may not be null
      * @param time     the time; may not be null
+     * @param clusterId the Kafka cluster ID
      */
-    public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+    public ConnectMetrics(String workerId, WorkerConfig config, Time time, String clusterId) {
         this.workerId = workerId;
         this.time = time;
 
@@ -75,10 +80,20 @@ public class ConnectMetrics {
         MetricConfig metricConfig = new MetricConfig().samples(numSamples)
                 .timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
                         Sensor.RecordingLevel.forName(metricsRecordingLevel));
-        JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+        JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(config.originals());
         reporters.add(jmxReporter);
-        this.metrics = new Metrics(metricConfig, reporters, time);
+
+        Map<String, Object> contextLabels = new HashMap<>();
+        contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+        contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+        Object groupId = config.originals().get(DistributedConfig.GROUP_ID_CONFIG);
+        if (groupId != null) {
+            contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, groupId);
+        }
+        MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels);
+        this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
+
         LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
         AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics, time.milliseconds());
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index d0a04e8..28b1149 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -57,6 +57,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.LoggingContext;
 import org.apache.kafka.connect.util.SinkUtils;
@@ -94,6 +95,8 @@ public class Worker {
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    //kafka cluster id
+    private final String kafkaClusterId;
     private final Plugins plugins;
     private final ConnectMetrics metrics;
     private final WorkerMetricsGroup workerMetricsGroup;
@@ -129,7 +132,8 @@ public class Worker {
             ExecutorService executorService,
             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy
     ) {
-        this.metrics = new ConnectMetrics(workerId, config, time);
+        this.kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        this.metrics = new ConnectMetrics(workerId, config, time, kafkaClusterId);
         this.executor = executorService;
         this.workerId = workerId;
         this.time = time;
@@ -530,13 +534,13 @@ public class Worker {
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, sourceConfig, connectorClass,
-                                                                connectorClientConfigOverridePolicy);
+                                                                connectorClientConfigOverridePolicy, kafkaClusterId);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
             TopicAdmin admin;
             Map<String, TopicCreationGroup> topicCreationGroups;
             if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
                 Map<String, Object> adminProps = adminConfigs(id, "connector-adminclient-" + id, config,
-                        sourceConfig, connectorClass, connectorClientConfigOverridePolicy);
+                        sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
                 admin = new TopicAdmin(adminProps);
                 topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
             } else {
@@ -554,7 +558,7 @@ public class Worker {
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
 
-            Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
             KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
 
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
@@ -571,7 +575,8 @@ public class Worker {
                                                WorkerConfig config,
                                                ConnectorConfig connConfig,
                                                Class<? extends Connector>  connectorClass,
-                                               ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                                               ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                               String clusterId) {
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
@@ -586,6 +591,8 @@ public class Worker {
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, defaultClientId);
         // User-specified overrides
         producerProps.putAll(config.originalsWithPrefix("producer."));
+        //add client metrics.context properties
+        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
 
         // Connector-specified overrides
         Map<String, Object> producerOverrides =
@@ -601,7 +608,8 @@ public class Worker {
                                                WorkerConfig config,
                                                ConnectorConfig connConfig,
                                                Class<? extends Connector> connectorClass,
-                                               ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                                               ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                               String clusterId) {
         // Include any unknown worker configs so consumer configs can be set globally on the worker
         // and through to the task
         Map<String, Object> consumerProps = new HashMap<>();
@@ -616,6 +624,8 @@ public class Worker {
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
         consumerProps.putAll(config.originalsWithPrefix("consumer."));
+        //add client metrics.context properties
+        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
         // Connector-specified overrides
         Map<String, Object> consumerOverrides =
             connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
@@ -631,7 +641,8 @@ public class Worker {
                                             WorkerConfig config,
                                             ConnectorConfig connConfig,
                                             Class<? extends Connector> connectorClass,
-                                            ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                                            ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                            String clusterId) {
         Map<String, Object> adminProps = new HashMap<>();
         // Use the top-level worker configs to retain backwards compatibility with older releases which
         // did not require a prefix for connector admin client configs in the worker configuration file
@@ -658,6 +669,9 @@ public class Worker {
                                            connectorClientConfigOverridePolicy);
         adminProps.putAll(adminOverrides);
 
+        //add client metrics.context properties
+        ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
+
         return adminProps;
     }
 
@@ -701,8 +715,8 @@ public class Worker {
         String topic = connConfig.dlqTopicName();
         if (topic != null && !topic.isEmpty()) {
             Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
-                                                                connectorClientConfigOverridePolicy);
-            Map<String, Object> adminProps = adminConfigs(id, "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+                                                                connectorClientConfigOverridePolicy, kafkaClusterId);
+            Map<String, Object> adminProps = adminConfigs(id, "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
             DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
             reporters.add(reporter);
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 0234926..9e40e56 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -251,6 +251,9 @@ public class WorkerConfig extends AbstractConfig {
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String CONNECT_KAFKA_CLUSTER_ID = "connect.kafka.cluster.id";
+    public static final String CONNECT_GROUP_ID = "connect.group.id";
+
     public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable";
     protected static final String TOPIC_CREATION_ENABLE_DOC = "Whether to allow "
             + "automatic creation of topics used by source connectors, when source connectors "
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index a0a00e1..40f2b07 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -27,6 +27,8 @@ import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
@@ -36,12 +38,15 @@ import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -88,10 +93,17 @@ public class WorkerGroupMember {
             List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
                     MetricsReporter.class,
                     Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
-            JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+            JmxReporter jmxReporter = new JmxReporter();
             jmxReporter.configure(config.originals());
             reporters.add(jmxReporter);
-            this.metrics = new Metrics(metricConfig, reporters, time);
+
+            Map<String, Object> contextLabels = new HashMap<>();
+            contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+            contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(config));
+            contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, config.getString(DistributedConfig.GROUP_ID_CONFIG));
+            MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels);
+
+            this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
                     logContext, new ClusterResourceListeners());
@@ -223,4 +235,9 @@ public class WorkerGroupMember {
         else
             log.debug("The Connect group member has stopped.");
     }
+
+    // Visible for testing
+    Metrics metrics() {
+        return this.metrics;
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index b9e9831..60c2665 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -40,6 +40,7 @@ import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
@@ -444,17 +445,21 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
     // package private for testing
     KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
+        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
         Map<String, Object> originals = config.originals();
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
+
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
 
         Map<String, Object> adminProps = new HashMap<>(originals);
-
+        ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).configStorageTopicSettings()
                                             : Collections.emptyMap();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 6100d07..1d26d65 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConvertingFutureCallback;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
@@ -66,6 +67,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
         if (topic == null || topic.trim().length() == 0)
             throw new ConfigException("Offset storage topic must be specified");
 
+        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
         data = new HashMap<>();
 
         Map<String, Object> originals = config.originals();
@@ -73,13 +75,15 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
 
         Map<String, Object> adminProps = new HashMap<>(originals);
-
+        ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).offsetStorageTopicSettings()
                                             : Collections.emptyMap();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 9827300..0b0f4a5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -40,6 +40,7 @@ import org.apache.kafka.connect.runtime.TopicStatus;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.Table;
@@ -152,17 +153,21 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
         if (this.statusTopic == null || this.statusTopic.trim().length() == 0)
             throw new ConfigException("Must specify topic for connector status.");
 
+        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
         Map<String, Object> originals = config.originals();
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
+        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
 
         Map<String, Object> adminProps = new HashMap<>(originals);
+        ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
 
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).statusStorageTopicSettings()
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index e1e4a87..7187f63 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -16,15 +16,18 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public final class ConnectUtils {
@@ -65,4 +68,15 @@ public final class ConnectUtils {
                                        + "Check worker's broker connection and security properties.", e);
         }
     }
+
+    public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) {
+        //add all properties predefined with "metrics.context."
+        prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
+        //add connect properties
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+        Object groupId = config.originals().get(DistributedConfig.GROUP_ID_CONFIG);
+        if (groupId != null) {
+            prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID, groupId);
+        }
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index fe15c01..3f75c0b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -53,7 +53,7 @@ public class ConnectMetricsTest {
 
     @Before
     public void setUp() {
-        metrics = new ConnectMetrics("worker1", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime());
+        metrics = new ConnectMetrics("worker1", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime(), "cluster-1");
     }
 
     @After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index 64ea526..38bd401 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.utils.MockTime;
 
@@ -58,7 +59,7 @@ public class MockConnectMetrics extends ConnectMetrics {
     }
 
     public MockConnectMetrics(MockTime time) {
-        super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time);
+        super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time, "cluster-1");
     }
 
     @Override
@@ -154,6 +155,8 @@ public class MockConnectMetrics extends ConnectMetrics {
     public static class MockMetricsReporter implements MetricsReporter {
         private Map<MetricName, KafkaMetric> metricsByName = new HashMap<>();
 
+        private MetricsContext metricsContext;
+
         public MockMetricsReporter() {
         }
 
@@ -194,5 +197,14 @@ public class MockConnectMetrics extends ConnectMetrics {
             KafkaMetric metric = metricsByName.get(metricName);
             return metric != null ? metric.metricValue() : null;
         }
+
+        @Override
+        public void contextChange(MetricsContext metricsContext) {
+            this.metricsContext = metricsContext;
+        }
+
+        public MetricsContext getMetricsContext() {
+            return this.metricsContext;
+        }
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index e32fcb0..48ab58f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -21,9 +21,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -58,6 +61,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.ThreadedTest;
 import org.apache.kafka.connect.util.TopicAdmin;
@@ -74,12 +78,17 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -96,13 +105,14 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Worker.class, Plugins.class})
+@PrepareForTest({Worker.class, Plugins.class, ConnectUtils.class})
 @PowerMockIgnore("javax.management.*")
 public class WorkerTest extends ThreadedTest {
 
     private static final String CONNECTOR_ID = "test-connector";
     private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
     private static final String WORKER_ID = "localhost:8083";
+    private static final String CLUSTER_ID = "test-cluster";
     private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
     private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
 
@@ -227,6 +237,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -288,6 +299,8 @@ public class WorkerTest extends ThreadedTest {
         );
         EasyMock.expectLastCall();
 
+        expectClusterId();
+
         PowerMock.replayAll();
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -345,6 +358,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -407,6 +421,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -435,6 +450,7 @@ public class WorkerTest extends ThreadedTest {
         expectConverters();
         expectStartStorage();
         expectFileConfigProvider();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -491,6 +507,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -586,6 +603,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -685,6 +703,8 @@ public class WorkerTest extends ThreadedTest {
         workerTask.stop();
         EasyMock.expectLastCall();
 
+        expectClusterId();
+
         PowerMock.replayAll();
 
         worker = new Worker(WORKER_ID,
@@ -744,6 +764,8 @@ public class WorkerTest extends ThreadedTest {
         taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
         EasyMock.expectLastCall();
 
+        expectClusterId();
+
         PowerMock.replayAll();
 
         worker = new Worker(WORKER_ID,
@@ -791,6 +813,8 @@ public class WorkerTest extends ThreadedTest {
         taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
         EasyMock.expectLastCall();
 
+        expectClusterId();
+
         PowerMock.replayAll();
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -867,6 +891,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -940,6 +965,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -976,8 +1002,9 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.replayAll();
         Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
         expectedConfigs.put("client.id", "connector-producer-job-0");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
         assertEquals(expectedConfigs,
-                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy));
+                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test
@@ -992,11 +1019,13 @@ public class WorkerTest extends ThreadedTest {
         expectedConfigs.put("acks", "-1");
         expectedConfigs.put("linger.ms", "1000");
         expectedConfigs.put("client.id", "producer-test-id");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
         EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
             new HashMap<String, Object>());
         PowerMock.replayAll();
         assertEquals(expectedConfigs,
-                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+            Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test
@@ -1012,6 +1041,8 @@ public class WorkerTest extends ThreadedTest {
         expectedConfigs.put("linger.ms", "5000");
         expectedConfigs.put("batch.size", "1000");
         expectedConfigs.put("client.id", "producer-test-id");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
         Map<String, Object> connConfig = new HashMap<String, Object>();
         connConfig.put("linger.ms", "5000");
         connConfig.put("batch.size", "1000");
@@ -1019,7 +1050,7 @@ public class WorkerTest extends ThreadedTest {
             .andReturn(connConfig);
         PowerMock.replayAll();
         assertEquals(expectedConfigs,
-                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+            Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test
@@ -1027,10 +1058,12 @@ public class WorkerTest extends ThreadedTest {
         Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
         expectedConfigs.put("group.id", "connect-test");
         expectedConfigs.put("client.id", "connector-consumer-test-1");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
         EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config, connectorConfig,
-                                                             null, noneConnectorClientConfigOverridePolicy));
+            null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test
@@ -1046,10 +1079,12 @@ public class WorkerTest extends ThreadedTest {
         expectedConfigs.put("auto.offset.reset", "latest");
         expectedConfigs.put("max.poll.records", "1000");
         expectedConfigs.put("client.id", "consumer-test-id");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
         EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
-                                                             null, noneConnectorClientConfigOverridePolicy));
+            null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
 
     }
 
@@ -1066,6 +1101,8 @@ public class WorkerTest extends ThreadedTest {
         expectedConfigs.put("max.poll.records", "5000");
         expectedConfigs.put("max.poll.interval.ms", "1000");
         expectedConfigs.put("client.id", "connector-consumer-test-1");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
         Map<String, Object> connConfig = new HashMap<String, Object>();
         connConfig.put("max.poll.records", "5000");
         connConfig.put("max.poll.interval.ms", "1000");
@@ -1073,7 +1110,7 @@ public class WorkerTest extends ThreadedTest {
             .andReturn(connConfig);
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
-                                                             null, allConnectorClientConfigOverridePolicy));
+            null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test(expected = ConnectException.class)
@@ -1090,7 +1127,7 @@ public class WorkerTest extends ThreadedTest {
             .andReturn(connConfig);
         PowerMock.replayAll();
         Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
-                               null, noneConnectorClientConfigOverridePolicy);
+            null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID);
     }
 
     @Test
@@ -1110,12 +1147,14 @@ public class WorkerTest extends ThreadedTest {
         expectedConfigs.put("bootstrap.servers", "localhost:9092");
         expectedConfigs.put("client.id", "testid");
         expectedConfigs.put("metadata.max.age.ms", "10000");
+        //we added a config on the fly
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
 
         EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX))
             .andReturn(connConfig);
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), "", configWithOverrides, connectorConfig,
-                                                             null, allConnectorClientConfigOverridePolicy));
+                                                             null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test(expected = ConnectException.class)
@@ -1132,7 +1171,78 @@ public class WorkerTest extends ThreadedTest {
             .andReturn(connConfig);
         PowerMock.replayAll();
         Worker.adminConfigs(new ConnectorTaskId("test", 1), "", configWithOverrides, connectorConfig,
-                                                          null, noneConnectorClientConfigOverridePolicy);
+                null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID);
+
+    }
+
+    @Test
+    public void testWorkerMetrics() throws Exception {
+        expectConverters();
+        expectStartStorage();
+        expectFileConfigProvider();
+
+        // Create
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+        EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
+                .andReturn(sourceConnector);
+        EasyMock.expect(sourceConnector.version()).andReturn("1.0");
+
+        Map<String, String> props = new HashMap<>();
+        props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
+
+        EasyMock.expect(sourceConnector.version()).andReturn("1.0");
+
+        EasyMock.expect(plugins.compareAndSwapLoaders(sourceConnector))
+                .andReturn(delegatingLoader)
+                .times(2);
+        sourceConnector.initialize(anyObject(ConnectorContext.class));
+        EasyMock.expectLastCall();
+        sourceConnector.start(props);
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader).times(2);
+
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
+        // Remove
+        sourceConnector.stop();
+        EasyMock.expectLastCall();
+
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
+        expectStopStorage();
+        expectClusterId();
+
+        PowerMock.replayAll();
+
+        Worker worker = new Worker("worker-1",
+                Time.SYSTEM,
+                plugins,
+                config,
+                offsetBackingStore,
+                noneConnectorClientConfigOverridePolicy
+                );
+        MetricName name = worker.metrics().metrics().metricName("test.avg", "grp1");
+        worker.metrics().metrics().addMetric(name, new Avg());
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        Set<ObjectInstance> ret = server.queryMBeans(null, null);
+
+        List<MetricsReporter> list = worker.metrics().metrics().reporters();
+        for (MetricsReporter reporter : list) {
+            if (reporter instanceof MockMetricsReporter) {
+                MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) reporter;
+                //verify connect cluster is set in MetricsContext
+                assertEquals(CLUSTER_ID, mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+            }
+        }
+        //verify metric is created with correct jmx prefix
+        assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1")));
     }
 
     private void assertStatusMetrics(long expected, String metricName) {
@@ -1272,6 +1382,11 @@ public class WorkerTest extends ThreadedTest {
         return props;
     }
 
+    private void expectClusterId() {
+        PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+        EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+    }
+
     private void expectNewWorkerTask() throws Exception {
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
index e3c982e..8b54033 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
@@ -57,6 +57,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.ThreadedTest;
 import org.apache.kafka.connect.util.TopicAdmin;
@@ -98,13 +99,14 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Worker.class, Plugins.class})
+@PrepareForTest({Worker.class, Plugins.class, ConnectUtils.class})
 @PowerMockIgnore("javax.management.*")
 public class WorkerWithTopicCreationTest extends ThreadedTest {
 
     private static final String CONNECTOR_ID = "test-connector";
     private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
     private static final String WORKER_ID = "localhost:8083";
+    private static final String CLUSTER_ID = "test-cluster";
     private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
     private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
 
@@ -228,6 +230,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -288,6 +291,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
                 EasyMock.<ConnectException>anyObject()
         );
         EasyMock.expectLastCall();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -346,6 +350,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -408,6 +413,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -436,6 +442,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         expectConverters();
         expectStartStorage();
         expectFileConfigProvider();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -492,6 +499,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -587,6 +595,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -685,6 +694,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
         workerTask.stop();
         EasyMock.expectLastCall();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -744,6 +754,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
 
         taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
         EasyMock.expectLastCall();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -791,6 +802,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
 
         taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
         EasyMock.expectLastCall();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -868,6 +880,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -941,6 +954,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         expectStopStorage();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -977,8 +991,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         PowerMock.replayAll();
         Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
         expectedConfigs.put("client.id", "connector-producer-job-0");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
         assertEquals(expectedConfigs,
-                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy));
+                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test
@@ -993,11 +1008,12 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         expectedConfigs.put("acks", "-1");
         expectedConfigs.put("linger.ms", "1000");
         expectedConfigs.put("client.id", "producer-test-id");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
         EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
             new HashMap<String, Object>());
         PowerMock.replayAll();
         assertEquals(expectedConfigs,
-                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test
@@ -1013,6 +1029,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         expectedConfigs.put("linger.ms", "5000");
         expectedConfigs.put("batch.size", "1000");
         expectedConfigs.put("client.id", "producer-test-id");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
         Map<String, Object> connConfig = new HashMap<String, Object>();
         connConfig.put("linger.ms", "5000");
         connConfig.put("batch.size", "1000");
@@ -1020,7 +1037,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
             .andReturn(connConfig);
         PowerMock.replayAll();
         assertEquals(expectedConfigs,
-                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test
@@ -1028,10 +1045,11 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
         expectedConfigs.put("group.id", "connect-test");
         expectedConfigs.put("client.id", "connector-consumer-test-1");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
         EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config, connectorConfig,
-                                                             null, noneConnectorClientConfigOverridePolicy));
+                                                             null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test
@@ -1047,10 +1065,11 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         expectedConfigs.put("auto.offset.reset", "latest");
         expectedConfigs.put("max.poll.records", "1000");
         expectedConfigs.put("client.id", "consumer-test-id");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
         EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
-                                                             null, noneConnectorClientConfigOverridePolicy));
+                                                             null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
 
     }
 
@@ -1067,6 +1086,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         expectedConfigs.put("max.poll.records", "5000");
         expectedConfigs.put("max.poll.interval.ms", "1000");
         expectedConfigs.put("client.id", "connector-consumer-test-1");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
         Map<String, Object> connConfig = new HashMap<String, Object>();
         connConfig.put("max.poll.records", "5000");
         connConfig.put("max.poll.interval.ms", "1000");
@@ -1074,7 +1094,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
             .andReturn(connConfig);
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
-                                                             null, allConnectorClientConfigOverridePolicy));
+                                                             null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test(expected = ConnectException.class)
@@ -1091,7 +1111,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
             .andReturn(connConfig);
         PowerMock.replayAll();
         Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
-                               null, noneConnectorClientConfigOverridePolicy);
+                               null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID);
     }
 
     @Test
@@ -1111,12 +1131,13 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         expectedConfigs.put("bootstrap.servers", "localhost:9092");
         expectedConfigs.put("client.id", "testid");
         expectedConfigs.put("metadata.max.age.ms", "10000");
+        expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
 
         EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX))
             .andReturn(connConfig);
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), "", configWithOverrides, connectorConfig,
-                                                             null, allConnectorClientConfigOverridePolicy));
+                                                             null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
     }
 
     @Test(expected = ConnectException.class)
@@ -1133,7 +1154,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
             .andReturn(connConfig);
         PowerMock.replayAll();
         Worker.adminConfigs(new ConnectorTaskId("test", 1), "", configWithOverrides, connectorConfig,
-                                                          null, noneConnectorClientConfigOverridePolicy);
+                                                          null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID);
     }
 
     private void assertStatusMetrics(long expected, String metricName) {
@@ -1299,6 +1320,12 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
                 anyObject(StatusBackingStore.class))
                 .andReturn(workerTask);
     }
+
+    private void expectClusterId() {
+        PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+        EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+    }
+
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class WorkerTestConnector extends SourceConnector {
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
new file mode 100644
index 0000000..05cd017
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName());
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+
+        expectClusterId();
+
+        member = new WorkerGroupMember(config, "", configBackingStore,
+        null, Time.SYSTEM, "client-1", logContext);
+
+        boolean entered = false;
+        for (MetricsReporter reporter : member.metrics().reporters()) {
+            if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
+                entered = true;
+                MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter;
+                assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+                assertEquals("group-1", mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_GROUP_ID));
+            }
+        }
+        assertTrue("Failed to verify MetricsReporter", entered);
+
+        MetricName name = member.metrics().metricName("test.avg", "grp1");
+        member.metrics().addMetric(name, new Avg());
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        //verify metric exists with correct prefix
+        assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1,client-id=client-1")));
+    }
+    private void expectClusterId() {
+        PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+        EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("cluster-1").anyTimes();
+        PowerMock.replay(ConnectUtils.class);
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index be72314..43bac54 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -79,7 +79,7 @@ public class RetryWithToleranceOperatorTest {
 
         NOOP_OPERATOR.metrics(new ErrorHandlingMetrics(
             new ConnectorTaskId("noop-connector", -1),
-            new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime()))
+            new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime(), "test-cluster"))
         );
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 61fe51f..8d78219 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TestFuture;
@@ -62,7 +63,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConfigBackingStore.class)
+@PrepareForTest({KafkaConfigBackingStore.class, ConnectUtils.class})
 @PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
 @SuppressWarnings({"unchecked", "deprecation"})
 public class KafkaConfigBackingStoreTest {
@@ -147,6 +148,10 @@ public class KafkaConfigBackingStoreTest {
 
     @Before
     public void setUp() {
+        PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+        EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+        PowerMock.replay(ConnectUtils.class);
+
         configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG, null);
         Whitebox.setInternalState(configStorage, "configLog", storeLog);
         configStorage.setUpdateListener(configUpdateListener);
@@ -157,7 +162,6 @@ public class KafkaConfigBackingStoreTest {
         expectConfigure();
         expectStart(Collections.emptyList(), Collections.emptyMap());
         expectStop();
-
         PowerMock.replayAll();
 
         Map<String, String> settings = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS);
@@ -982,5 +986,4 @@ public class KafkaConfigBackingStoreTest {
             result.put(field.name(), struct.get(field));
         return result;
     }
-
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index b72f269..0d0194f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -58,7 +59,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaOffsetBackingStore.class)
+@PrepareForTest({KafkaOffsetBackingStore.class, ConnectUtils.class})
 @PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
 @SuppressWarnings({"unchecked", "deprecation"})
 public class KafkaOffsetBackingStoreTest {
@@ -119,6 +120,7 @@ public class KafkaOffsetBackingStoreTest {
         expectConfigure();
         expectStart(Collections.emptyList());
         expectStop();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -152,6 +154,7 @@ public class KafkaOffsetBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
         ));
         expectStop();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -214,6 +217,7 @@ public class KafkaOffsetBackingStoreTest {
             }
         });
 
+        expectClusterId();
         PowerMock.replayAll();
 
         store.configure(DEFAULT_DISTRIBUTED_CONFIG);
@@ -284,6 +288,7 @@ public class KafkaOffsetBackingStoreTest {
         });
 
         expectStop();
+        expectClusterId();
 
         PowerMock.replayAll();
 
@@ -337,6 +342,8 @@ public class KafkaOffsetBackingStoreTest {
         storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
         PowerMock.expectLastCall();
 
+        expectClusterId();
+
         PowerMock.replayAll();
 
         store.configure(DEFAULT_DISTRIBUTED_CONFIG);
@@ -404,6 +411,11 @@ public class KafkaOffsetBackingStoreTest {
         PowerMock.expectLastCall();
     }
 
+    private void expectClusterId() {
+        PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+        EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+    }
+
     private static ByteBuffer buffer(String v) {
         return ByteBuffer.wrap(v.getBytes());
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
index f7e092c6..f92143a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -16,13 +16,19 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -61,4 +67,37 @@ public class ConnectUtilsTest {
         ConnectUtils.lookupKafkaClusterId(adminClient);
     }
 
+    @Test
+    public void testAddMetricsContextPropertiesDistributed() {
+        Map<String, String> props = new HashMap<>();
+        props.put(DistributedConfig.GROUP_ID_CONFIG, "connect-cluster");
+        props.put(DistributedConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
+        props.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+        props.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connect-status");
+        props.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        props.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DistributedConfig config = new DistributedConfig(props);
+
+        Map<String, Object> prop = new HashMap<>();
+        ConnectUtils.addMetricsContextProperties(prop, config, "cluster-1");
+        assertEquals("connect-cluster", prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID));
+        assertEquals("cluster-1", prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+    }
+
+    @Test
+    public void testAddMetricsContextPropertiesStandalone() {
+        Map<String, String> props = new HashMap<>();
+        props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "offsetStorageFile");
+        props.put(StandaloneConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        StandaloneConfig config = new StandaloneConfig(props);
+
+        Map<String, Object> prop = new HashMap<>();
+        ConnectUtils.addMetricsContextProperties(prop, config, "cluster-1");
+        assertEquals(null, prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID));
+        assertEquals("cluster-1", prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+
+    }
 }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f41dbf2..ec8e00b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -793,6 +793,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
       currentReporters += reporter.getClass.getName -> reporter
     }
     server.notifyClusterListeners(reporters.asScala)
+    server.notifyMetricsReporters(reporters.asScala)
   }
 
   private def removeReporter(className: String): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 05a6257..bad56aa 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -35,10 +35,10 @@ import kafka.network.SocketServer
 import kafka.security.CredentialProvider
 import kafka.utils._
 import kafka.zk.{BrokerInfo, KafkaZkClient}
-import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils, CommonClientConfigs}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.message.ControlledShutdownRequestData
-import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter, _}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
@@ -129,7 +129,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
-  private val jmxPrefix: String = "kafka.server"
+  //properties for MetricsContext
+  private val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
 
   private var logContext: LogContext = null
 
@@ -247,13 +251,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
         kafkaYammerMetrics.configure(config.originals)
 
-        val jmxReporter = new JmxReporter(jmxPrefix)
+        val jmxReporter = new JmxReporter()
         jmxReporter.configure(config.originals)
 
         val reporters = new util.ArrayList[MetricsReporter]
         reporters.add(jmxReporter)
+
         val metricConfig = KafkaServer.metricConfig(config)
-        metrics = new Metrics(metricConfig, reporters, time, true)
+        val metricsContext = createKafkaMetricsContext()
+        metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
 
         /* register broker metrics */
         _brokerTopicStats = new BrokerTopicStats
@@ -365,7 +371,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
         isStartingUp.set(false)
-        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics, time.milliseconds())
+        AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
         info("started")
       }
     }
@@ -384,6 +390,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
   }
 
+  private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext()
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+
+  private[server] def createKafkaMetricsContext() : KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
     new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
       brokerTopicStats, metadataCache, logDirFailureChannel)
@@ -687,7 +710,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         startupComplete.set(false)
         isShuttingDown.set(false)
-        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics), this)
+        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.brokerId.toString, metrics), this)
         shutdownLatch.countDown()
         info("shut down completed")
       }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 405f12c..6266d7f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.metrics.{JmxReporter, Metrics => kMetrics}
+import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics => kMetrics}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -2356,7 +2356,9 @@ class GroupMetadataManagerTest {
   def testPartitionLoadMetric(): Unit = {
     val server = ManagementFactory.getPlatformMBeanServer
     val mBeanName = "kafka.server:type=group-coordinator-metrics"
-    val reporter = new JmxReporter("kafka.server")
+    val reporter = new JmxReporter
+    val metricsContext = new KafkaMetricsContext("kafka.server")
+    reporter.contextChange(metricsContext)
     metrics.addReporter(reporter)
 
     def partitionLoadTime(attribute: String): Double = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index e02f46d..3055742 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -28,7 +28,7 @@ import kafka.utils.{MockScheduler, Pool, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
-import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
+import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -773,7 +773,9 @@ class TransactionStateManagerTest {
   def testPartitionLoadMetric(): Unit = {
     val server = ManagementFactory.getPlatformMBeanServer
     val mBeanName = "kafka.server:type=transaction-coordinator-metrics"
-    val reporter = new JmxReporter("kafka.server")
+    val reporter = new JmxReporter
+    val metricsContext = new KafkaMetricsContext("kafka.server")
+    reporter.contextChange(metricsContext)
     metrics.addReporter(reporter)
 
     def partitionLoadTime(attribute: String): Double = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
new file mode 100644
index 0000000..4e0c9f8
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+
+import java.util.concurrent.atomic.AtomicReference
+
+import kafka.utils.{CoreUtils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter}
+import org.junit.Assert.{assertEquals}
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+
+object KafkaMetricsReporterTest {
+  val setupError = new AtomicReference[String]("")
+
+  class MockMetricsReporter extends MetricsReporter {
+    def init(metrics: util.List[KafkaMetric]): Unit = {}
+
+    def metricChange(metric: KafkaMetric): Unit = {}
+
+    def metricRemoval(metric: KafkaMetric): Unit = {}
+
+    override def close(): Unit = {}
+
+    override def contextChange(metricsContext: MetricsContext): Unit = {
+      //read jmxPrefix
+
+      MockMetricsReporter.JMXPREFIX.set(metricsContext.contextLabels().get("_namespace").toString)
+      MockMetricsReporter.CLUSTERID.set(metricsContext.contextLabels().get("kafka.cluster.id").toString)
+      MockMetricsReporter.BROKERID.set(metricsContext.contextLabels().get("kafka.broker.id").toString)
+    }
+
+    override def configure(configs: util.Map[String, _]): Unit = {}
+
+  }
+
+  object MockMetricsReporter {
+    val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
+    val BROKERID : AtomicReference[String] = new AtomicReference[String]
+    val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
+  }
+}
+
+class KafkaMetricsReporterTest extends ZooKeeperTestHarness {
+  var server: KafkaServerStartable = null
+  var config: KafkaConfig = null
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
+    props.setProperty(KafkaConfig.BrokerIdProp, "-1")
+    config = KafkaConfig.fromProps(props)
+    server = KafkaServerStartable.fromProps(props, threadNamePrefix = Option(this.getClass.getName))
+    server.startup()
+  }
+
+  @Test
+  def testMetricsContextNamespacePresent(): Unit = {
+    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID)
+    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID)
+    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX)
+    assertEquals("kafka.server", KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
+
+    server.shutdown()
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    server.shutdown()
+    CoreUtils.delete(config.logDirs)
+    super.tearDown()
+  }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 15b38eb..7a7e4df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -25,6 +26,8 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
@@ -678,10 +681,12 @@ public class KafkaStreams implements AutoCloseable {
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class,
                 Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
-        final JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+        final JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(config.originals());
         reporters.add(jmxReporter);
-        metrics = new Metrics(metricConfig, reporters, time);
+        final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
+                config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+        metrics = new Metrics(metricConfig, reporters, time, metricsContext);
         streamsMetrics =
             new StreamsMetricsImpl(metrics, clientId, config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG));
         rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 9c7c01f..95942be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.serialization.Serdes;
@@ -172,7 +173,8 @@ public class KafkaStreamsTest {
         PowerMock.expectNew(Metrics.class,
             anyObject(MetricConfig.class),
             capture(metricsReportersCapture),
-            anyObject(Time.class)
+            anyObject(Time.class),
+            anyObject(MetricsContext.class)
         ).andAnswer(() -> {
             for (final MetricsReporter reporter : metricsReportersCapture.getValue()) {
                 reporter.init(Collections.emptyList());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 49cba19..5df51e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -28,8 +28,10 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.record.TimestampType;
@@ -575,7 +577,10 @@ public class StreamTaskTest {
             testMetricsForBuiltInMetricsVersionLatest();
         }
 
-        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
         metrics.addReporter(reporter);
         final String threadIdTag =
             StreamsConfig.METRICS_LATEST.equals(builtInMetricsVersion) ? THREAD_ID_TAG : THREAD_ID_TAG_0100_TO_24;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1643e03..91c3797 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -36,8 +36,10 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -408,7 +410,10 @@ public class StreamThreadTest {
                 "commit-rate", taskGroupName, descriptionIsNotVerified, taskTags)));
         }
 
-        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
         metrics.addReporter(reporter);
         assertEquals(CLIENT_ID + "-StreamThread-1", thread.getName());
         assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 9a2b9fd..4d666b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -145,7 +147,10 @@ public class MeteredKeyValueStoreTest {
     @Test
     public void testMetrics() {
         init();
-        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
         metrics.addReporter(reporter);
         assertTrue(reporter.containsMbean(String.format(
             "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index d1f805c..7d558dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -20,7 +20,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -144,7 +146,10 @@ public class MeteredSessionStoreTest {
     @Test
     public void testMetrics() {
         init();
-        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
         metrics.addReporter(reporter);
         assertTrue(reporter.containsMbean(String.format(
             "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index b32c331..5522855 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -147,7 +149,10 @@ public class MeteredTimestampedKeyValueStoreTest {
     @Test
     public void testMetrics() {
         init();
-        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
         metrics.addReporter(reporter);
         assertTrue(reporter.containsMbean(String.format(
             "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 569203e..2e1fb12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -141,7 +143,10 @@ public class MeteredWindowStoreTest {
     public void testMetrics() {
         replay(innerStoreMock);
         store.init(context, store);
-        final JmxReporter reporter = new JmxReporter("kafka.streams");
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
         metrics.addReporter(reporter);
         assertTrue(reporter.containsMbean(String.format(
             "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",


Mime
View raw message