kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: HOTFIX: Hide built-in metrics version (#7459)
Date Mon, 07 Oct 2019 17:48:11 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 6859820  HOTFIX: Hide built-in metrics version (#7459)
6859820 is described below

commit 6859820755332eb90f386a5b76a0c43e035637dd
Author: Bruno Cadonna <bruno@confluent.io>
AuthorDate: Mon Oct 7 19:47:50 2019 +0200

    HOTFIX: Hide built-in metrics version (#7459)
    
    Since currently not all refactorings on streams metrics proposed in KIP-444 has yet been
implemented, this commit hides the built-in metrics version config from the user. Thus, the
user cannot switch to the refactored streams metrics.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  3 +-
 .../org/apache/kafka/streams/StreamsConfig.java    | 23 ---------------
 .../internals/metrics/StreamsMetricsImpl.java      | 16 ++++++++--
 .../apache/kafka/streams/StreamsConfigTest.java    | 34 ----------------------
 .../integration/MetricsIntegrationTest.java        | 21 +++++--------
 .../internals/GlobalStreamThreadTest.java          |  4 +--
 .../processor/internals/MockStreamsMetrics.java    |  3 +-
 .../processor/internals/StandbyTaskTest.java       |  3 +-
 .../processor/internals/StreamThreadTest.java      | 18 +-----------
 .../internals/metrics/StreamsMetricsImplTest.java  | 16 +++++-----
 .../internals/GlobalStateStoreProviderTest.java    |  3 +-
 .../MeteredTimestampedWindowStoreTest.java         |  2 +-
 .../state/internals/MeteredWindowStoreTest.java    |  2 +-
 .../kafka/test/InternalMockProcessorContext.java   | 10 +++----
 .../apache/kafka/streams/TopologyTestDriver.java   |  4 +--
 .../streams/processor/MockProcessorContext.java    |  3 +-
 16 files changed, 47 insertions(+), 118 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5a00fe3..146eefd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -675,8 +675,7 @@ public class KafkaStreams implements AutoCloseable {
                 Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
         reporters.add(new JmxReporter(JMX_PREFIX));
         metrics = new Metrics(metricConfig, reporters, time);
-        streamsMetrics =
-            new StreamsMetricsImpl(metrics, clientId, config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG));
+        streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsMetricsImpl.METRICS_0100_TO_23);
         streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
         ClientMetrics.addVersionMetric(streamsMetrics);
         ClientMetrics.addCommitIdMetric(streamsMetrics);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index eb6faff..0a4bfd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -287,16 +287,6 @@ public class StreamsConfig extends AbstractConfig {
     @SuppressWarnings("WeakerAccess")
     public static final String EXACTLY_ONCE = "exactly_once";
 
-    /**
-     * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"}
for built-in metrics from version 0.10.0. to 2.3
-     */
-    public static final String METRICS_0100_TO_23 = "0.10.0-2.3";
-
-    /**
-     * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"}
for the latest built-in metrics version.
-     */
-    public static final String METRICS_LATEST = "latest";
-
     /** {@code application.id} */
     @SuppressWarnings("WeakerAccess")
     public static final String APPLICATION_ID_CONFIG = "application.id";
@@ -316,10 +306,6 @@ public class StreamsConfig extends AbstractConfig {
     public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
     private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records
to buffer per partition.";
 
-    /** {@code built.in.metrics.version} */
-    public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version";
-    private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics
to use.";
-
     /** {@code cache.max.bytes.buffering} */
     @SuppressWarnings("WeakerAccess")
     public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
@@ -625,15 +611,6 @@ public class StreamsConfig extends AbstractConfig {
                     1000,
                     Importance.LOW,
                     BUFFERED_RECORDS_PER_PARTITION_DOC)
-            .define(BUILT_IN_METRICS_VERSION_CONFIG,
-                    Type.STRING,
-                    METRICS_LATEST,
-                    in(
-                        METRICS_0100_TO_23,
-                        METRICS_LATEST
-                    ),
-                    Importance.LOW,
-                    BUILT_IN_METRICS_VERSION_DOC)
             .define(COMMIT_INTERVAL_MS_CONFIG,
                     Type.LONG,
                     DEFAULT_COMMIT_INTERVAL_MS,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index aea1f03..8042223 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.metrics.stats.WindowedSum;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 
@@ -53,7 +52,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         FROM_100_TO_23
     }
 
+    // Temporarily moved from StreamsConfig to here to hide the built-in metrics version
config
+    /**
+     * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"}
for built-in metrics from version 0.10.0. to 2.3
+     */
+    public static final String METRICS_0100_TO_23 = "0.10.0-2.3";
+
+    /**
+     * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"}
for the latest built-in metrics version.
+     */
+    public static final String METRICS_LATEST = "latest";
+
+
     static class ImmutableMetricValue<T> implements Gauge<T> {
+
         private final T value;
 
         public ImmutableMetricValue(final T value) {
@@ -156,7 +168,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
     private static Version parseBuiltInMetricsVersion(final String builtInMetricsVersion)
{
-        if (builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST)) {
+        if (builtInMetricsVersion.equals(StreamsMetricsImpl.METRICS_LATEST)) {
             return Version.LATEST;
         } else {
             return Version.FROM_100_TO_23;
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ad60708..6ac866f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -52,14 +52,11 @@ import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
-import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -487,37 +484,6 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldAcceptBuiltInMetricsVersion0100To23() {
-        // don't use `StreamsConfig.METRICS_0100_TO_23` to actually do a useful test
-        props.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, "0.10.0-2.3");
-        new StreamsConfig(props);
-    }
-
-    @Test
-    public void shouldAcceptBuiltInMetricsLatestVersion() {
-        // don't use `StreamsConfig.METRICS_LATEST` to actually do a useful test
-        props.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, "latest");
-        new StreamsConfig(props);
-    }
-
-    @Test
-    public void shouldSetDefaultBuiltInMetricsVersionIfNoneIsSpecified() {
-        final StreamsConfig config = new StreamsConfig(props);
-        assertThat(config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), is(StreamsConfig.METRICS_LATEST));
-    }
-
-    @Test
-    public void shouldThrowIfBuiltInMetricsVersionInvalid() {
-        final String invalidVersion = "0.0.1";
-        props.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, invalidVersion);
-        final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
-        assertThat(
-            exception.getMessage(),
-            containsString("Invalid value " + invalidVersion + " for configuration built.in.metrics.version")
-        );
-    }
-
-    @Test
     public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 6030ac6..010f277 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -292,17 +293,11 @@ public class MetricsIntegrationTest {
     }
 
     @Test
-    public void shouldAddMetricsOnAllLevelsWithBuiltInMetricsLatestVersion() throws Exception
{
-        shouldAddMetricsOnAllLevels(StreamsConfig.METRICS_LATEST);
-    }
-
-    @Test
     public void shouldAddMetricsOnAllLevelsWithBuiltInMetricsVersion0100To23() throws Exception
{
-        shouldAddMetricsOnAllLevels(StreamsConfig.METRICS_0100_TO_23);
+        shouldAddMetricsOnAllLevels(StreamsMetricsImpl.METRICS_0100_TO_23);
     }
 
     private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) throws Exception
{
-        streamsConfiguration.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
 
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
             .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
@@ -593,18 +588,18 @@ public class MetricsIntegrationTest {
             .collect(Collectors.toList());
         checkMetricByName(
             listMetricCache,
-            builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_AVG :
HIT_RATIO_AVG_BEFORE_24,
-            builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes
parent sensors */
+            builtInMetricsVersion.equals(StreamsMetricsImpl.METRICS_LATEST) ? HIT_RATIO_AVG
: HIT_RATIO_AVG_BEFORE_24,
+            builtInMetricsVersion.equals(StreamsMetricsImpl.METRICS_LATEST) ? 3 : 6 /* includes
parent sensors */
         );
         checkMetricByName(
             listMetricCache,
-            builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_MIN :
HIT_RATIO_MIN_BEFORE_24,
-            builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes
parent sensors */
+            builtInMetricsVersion.equals(StreamsMetricsImpl.METRICS_LATEST) ? HIT_RATIO_MIN
: HIT_RATIO_MIN_BEFORE_24,
+            builtInMetricsVersion.equals(StreamsMetricsImpl.METRICS_LATEST) ? 3 : 6 /* includes
parent sensors */
         );
         checkMetricByName(
             listMetricCache,
-            builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_MAX :
HIT_RATIO_MAX_BEFORE_24,
-            builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes
parent sensors */
+            builtInMetricsVersion.equals(StreamsMetricsImpl.METRICS_LATEST) ? HIT_RATIO_MAX
: HIT_RATIO_MAX_BEFORE_24,
+            builtInMetricsVersion.equals(StreamsMetricsImpl.METRICS_LATEST) ? 3 : 6 /* includes
parent sensors */
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index ddb20c9..aa99505 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -108,7 +108,7 @@ public class GlobalStreamThreadTest {
             mockConsumer,
             new StateDirectory(config, time, true),
             0,
-            new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST),
+            new StreamsMetricsImpl(new Metrics(), "test-client", StreamsMetricsImpl.METRICS_LATEST),
             new MockTime(),
             "clientId",
             stateRestoreListener
@@ -143,7 +143,7 @@ public class GlobalStreamThreadTest {
             mockConsumer,
             new StateDirectory(config, time, true),
             0,
-            new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST),
+            new StreamsMetricsImpl(new Metrics(), "test-client", StreamsMetricsImpl.METRICS_LATEST),
             new MockTime(),
             "clientId",
             stateRestoreListener
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index 273f4b9..3398ba1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -17,12 +17,11 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 public class MockStreamsMetrics extends StreamsMetricsImpl {
 
     public MockStreamsMetrics(final Metrics metrics) {
-        super(metrics, "test", StreamsConfig.METRICS_LATEST);
+        super(metrics, "test", StreamsMetricsImpl.METRICS_LATEST);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index c2b9cdb..6f4483a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -160,9 +160,8 @@ public class StandbyTaskTest {
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
-    private final String threadName = "threadName";
     private final StreamsMetricsImpl streamsMetrics =
-        new StreamsMetricsImpl(new Metrics(), threadName, StreamsConfig.METRICS_LATEST);
+        new StreamsMetricsImpl(new Metrics(), applicationId, StreamsMetricsImpl.METRICS_LATEST);
 
     @Before
     public void setup() throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8d10e4c..73cf768 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -121,7 +121,7 @@ public class StreamThreadTest {
     private final MockTime mockTime = new MockTime();
     private final Metrics metrics = new Metrics();
     private final StreamsMetricsImpl streamsMetrics =
-        new StreamsMetricsImpl(metrics, APPLICATION_ID, StreamsConfig.METRICS_LATEST);
+        new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsMetricsImpl.METRICS_LATEST);
     private final MockClientSupplier clientSupplier = new MockClientSupplier();
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new
InternalTopologyBuilder());
     private final StreamsConfig config = new StreamsConfig(configProps(false));
@@ -345,7 +345,6 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
 
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID,
StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -473,7 +472,6 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID,
StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -508,7 +506,6 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
 
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID,
StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -662,8 +659,6 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -696,8 +691,6 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -774,8 +767,6 @@ public class StreamThreadTest {
         taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
         taskManager.setPartitionsToTaskId(Collections.emptyMap());
 
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -809,8 +800,6 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -1231,8 +1220,6 @@ public class StreamThreadTest {
     private StandbyTask createStandbyTask() {
         final LogContext logContext = new LogContext("test");
         final Logger log = logContext.logger(StreamThreadTest.class);
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
         final StreamThread.StandbyTaskCreator standbyTaskCreator = new StreamThread.StandbyTaskCreator(
             internalTopologyBuilder,
             config,
@@ -1677,8 +1664,6 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -1717,7 +1702,6 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
 
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID,
StreamsConfig.METRICS_LATEST);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 69f7546..d8cdfe4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
 import org.easymock.EasyMock;
@@ -63,7 +62,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
     private final static String SENSOR_PREFIX_DELIMITER = ".";
     private final static String SENSOR_NAME_DELIMITER = ".s.";
     private final static String INTERNAL_PREFIX = "internal";
-    private final static String VERSION = StreamsConfig.METRICS_LATEST;
+    private final static String VERSION = StreamsMetricsImpl.METRICS_LATEST;
     private final static String CLIENT_ID = "test-client";
     private final static String THREAD_ID = "test-thread";
     private final static String TASK_ID = "test-task";
@@ -439,17 +438,16 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
 
     @Test
     public void shouldGetCacheLevelTagMapForBuiltInMetricsLatestVersion() {
-        shouldGetCacheLevelTagMap(StreamsConfig.METRICS_LATEST);
+        shouldGetCacheLevelTagMap(StreamsMetricsImpl.METRICS_LATEST);
     }
 
     @Test
     public void shouldGetCacheLevelTagMapForBuiltInMetricsVersion0100To23() {
-        shouldGetCacheLevelTagMap(StreamsConfig.METRICS_0100_TO_23);
+        shouldGetCacheLevelTagMap(StreamsMetricsImpl.METRICS_0100_TO_23);
     }
 
     private void shouldGetCacheLevelTagMap(final String builtInMetricsVersion) {
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, THREAD_ID, builtInMetricsVersion);
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID,
builtInMetricsVersion);
         final String taskName = "taskName";
         final String storeName = "storeName";
 
@@ -458,7 +456,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
         assertThat(tagMap.size(), equalTo(3));
         assertThat(
             tagMap.get(
-                builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? StreamsMetricsImpl.THREAD_ID_TAG
+                builtInMetricsVersion.equals(StreamsMetricsImpl.METRICS_LATEST) ? StreamsMetricsImpl.THREAD_ID_TAG
                     : StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_23),
             equalTo(THREAD_ID)
         );
@@ -551,7 +549,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
     @Test
     public void shouldReturnMetricsVersionCurrent() {
         assertThat(
-            new StreamsMetricsImpl(metrics, THREAD_ID, StreamsConfig.METRICS_LATEST).version(),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsMetricsImpl.METRICS_LATEST).version(),
             equalTo(Version.LATEST)
         );
     }
@@ -559,7 +557,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
     @Test
     public void shouldReturnMetricsVersionFrom100To23() {
         assertThat(
-            new StreamsMetricsImpl(metrics, THREAD_ID, StreamsConfig.METRICS_0100_TO_23).version(),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsMetricsImpl.METRICS_0100_TO_23).version(),
             equalTo(Version.FROM_100_TO_23)
         );
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 1ab8684..db9205e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -92,7 +91,7 @@ public class GlobalStateStoreProviderTest {
         final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class);
         expect(mockContext.applicationId()).andReturn("appId").anyTimes();
         expect(mockContext.metrics())
-            .andReturn(new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST))
+            .andReturn(new StreamsMetricsImpl(new Metrics(), "test-client", StreamsMetricsImpl.METRICS_LATEST))
             .anyTimes();
         expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes();
         expect(mockContext.recordCollector()).andReturn(null).anyTimes();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 53096d2..67f1358 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -60,7 +60,7 @@ public class MeteredTimestampedWindowStoreTest {
     @Before
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST);
+            new StreamsMetricsImpl(metrics, "test", StreamsMetricsImpl.METRICS_LATEST);
 
         context = new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 588307f..6e65f0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -76,7 +76,7 @@ public class MeteredWindowStoreTest {
     @Before
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST);
+            new StreamsMetricsImpl(metrics, "test", StreamsMetricsImpl.METRICS_LATEST);
 
         context = new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index f5cb014..526b855 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -70,7 +70,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext
imple
         this(null,
             null,
             null,
-            new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
+            new StreamsMetricsImpl(new Metrics(), "mock", StreamsMetricsImpl.METRICS_LATEST),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             null,
             null
@@ -83,7 +83,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext
imple
             stateDir,
             null,
             null,
-            new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
+            new StreamsMetricsImpl(new Metrics(), "mock", StreamsMetricsImpl.METRICS_LATEST),
             config,
             null,
             null
@@ -98,7 +98,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext
imple
             stateDir,
             keySerde,
             valSerde,
-            new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
+            new StreamsMetricsImpl(new Metrics(), "mock", StreamsMetricsImpl.METRICS_LATEST),
             config,
             null,
             null
@@ -117,7 +117,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext
imple
             null,
             serdes.keySerde(),
             serdes.valueSerde(),
-            new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST),
+            new StreamsMetricsImpl(metrics, "mock", StreamsMetricsImpl.METRICS_LATEST),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             null
@@ -132,7 +132,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext
imple
         this(stateDir,
             keySerde,
             valSerde,
-            new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
+            new StreamsMetricsImpl(new Metrics(), "mock", StreamsMetricsImpl.METRICS_LATEST),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             cache
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 5abe89c..9a6de24 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -306,7 +306,7 @@ public class TopologyTestDriver implements Closeable {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             "test-client",
-            StreamsConfig.METRICS_LATEST
+            StreamsMetricsImpl.METRICS_LATEST
         );
         streamsMetrics.setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger());
         final Sensor skippedRecordsSensor =
@@ -1086,4 +1086,4 @@ public class TopologyTestDriver implements Closeable {
         }
         return consumer;
     }
-}
\ No newline at end of file
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 09850e4..947e65e 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -214,7 +214,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         final MetricConfig metricConfig = new MetricConfig();
         metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
         final String threadId = Thread.currentThread().getName();
-        this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadId, StreamsConfig.METRICS_LATEST);
+        this.metrics =
+            new StreamsMetricsImpl(new Metrics(metricConfig), "test-client", StreamsMetricsImpl.METRICS_LATEST);
         ThreadMetrics.skipRecordSensor(threadId, metrics);
     }
 


Mime
View raw message