kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7223: Suppression Buffer Metrics (#5795)
Date Tue, 27 Nov 2018 20:57:13 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55c77eb  KAFKA-7223: Suppression Buffer Metrics (#5795)
55c77eb is described below

commit 55c77ebf01ea8662b98f73f6f6c17d05163a85b8
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Tue Nov 27 14:57:04 2018 -0600

    KAFKA-7223: Suppression Buffer Metrics (#5795)
    
    Add the final batch of metrics from KIP-328
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/kstream/internals/metrics/Sensors.java |  47 ++++-
 .../suppress/KTableSuppressProcessor.java          |   8 +-
 .../streams/processor/internals/ProcessorNode.java |  21 +--
 .../internals/metrics/StreamsMetricsImpl.java      |   3 +
 .../InMemoryTimeOrderedKeyValueBuffer.java         |  26 ++-
 .../streams/state/internals/metrics/Sensors.java   |  69 ++++++-
 .../KTableSuppressProcessorMetricsTest.java        | 203 +++++++++++++++++++++
 .../suppress/KTableSuppressProcessorTest.java      |  13 +-
 .../streams/processor/MockProcessorContext.java    |   9 +-
 9 files changed, 375 insertions(+), 24 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 5b0d8b5..12c4813 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -20,10 +20,17 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 
 public class Sensors {
     private Sensors() {}
@@ -38,8 +45,8 @@ public class Sensors {
         );
         StreamsMetricsImpl.addInvocationRateAndCount(
             sensor,
-            "stream-processor-node-metrics",
-            metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", context.currentNode().name()),
+            PROCESSOR_NODE_METRICS_GROUP,
+            metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG,
context.currentNode().name()),
             "late-record-drop"
         );
         return sensor;
@@ -75,4 +82,40 @@ public class Sensors {
         );
         return sensor;
     }
+
+    public static Sensor suppressionEmitSensor(final InternalProcessorContext context) {
+        final StreamsMetricsImpl metrics = context.metrics();
+
+        final Sensor sensor = metrics.nodeLevelSensor(
+            context.taskId().toString(),
+            context.currentNode().name(),
+            "suppression-emit",
+            Sensor.RecordingLevel.DEBUG
+        );
+
+        final Map<String, String> tags = metrics.tagMap(
+            "task-id", context.taskId().toString(),
+            PROCESSOR_NODE_ID_TAG, context.currentNode().name()
+        );
+
+        sensor.add(
+            new MetricName(
+                "suppression-emit-rate",
+                PROCESSOR_NODE_METRICS_GROUP,
+                "The average number of occurrence of suppression-emit operation per second.",
+                tags
+            ),
+            new Rate(TimeUnit.SECONDS, new Sum())
+        );
+        sensor.add(
+            new MetricName(
+                "suppression-emit-total",
+                PROCESSOR_NODE_METRICS_GROUP,
+                "The total number of occurrence of suppression-emit operations.",
+                tags
+            ),
+            new Total()
+        );
+        return sensor;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 50e74a3..06d5004 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
 import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -42,9 +44,10 @@ public class KTableSuppressProcessor<K, V> implements Processor<K,
Change<V>> {
     private final BufferFullStrategy bufferFullStrategy;
     private final boolean shouldSuppressTombstones;
     private final String storeName;
+
     private TimeOrderedKeyValueBuffer buffer;
     private InternalProcessorContext internalProcessorContext;
-
+    private Sensor suppressionEmitSensor;
     private Serde<K> keySerde;
     private FullChangeSerde<V> valueSerde;
 
@@ -68,6 +71,8 @@ public class KTableSuppressProcessor<K, V> implements Processor<K,
Change<V>> {
     @Override
     public void init(final ProcessorContext context) {
         internalProcessorContext = (InternalProcessorContext) context;
+        suppressionEmitSensor = Sensors.suppressionEmitSensor(internalProcessorContext);
+
         keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
         valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde())
: valueSerde;
         buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
@@ -123,6 +128,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K,
Change<V>> {
             try {
                 final K key = keySerde.deserializer().deserialize(null, toEmit.key.get());
                 internalProcessorContext.forward(key, value);
+                suppressionEmitSensor.record();
             } finally {
                 internalProcessorContext.setRecordContext(prevRecordContext);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8dc6417..8483791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
 
@@ -165,15 +167,13 @@ public class ProcessorNode<K, V> {
         private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNodeName,
final ProcessorContext context) {
             this.metrics = metrics;
 
-            final String group = "stream-processor-node-metrics";
             final String taskName = context.taskId().toString();
-            final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(),
"processor-node-id", processorNodeName);
-            final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(),
"processor-node-id", "all");
+            final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(),
PROCESSOR_NODE_ID_TAG, processorNodeName);
+            final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(),
PROCESSOR_NODE_ID_TAG, "all");
 
             nodeProcessTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "process",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -183,7 +183,6 @@ public class ProcessorNode<K, V> {
             nodePunctuateTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "punctuate",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -193,7 +192,6 @@ public class ProcessorNode<K, V> {
             nodeCreationSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "create",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -204,7 +202,6 @@ public class ProcessorNode<K, V> {
             nodeDestructionSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "destroy",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -214,7 +211,6 @@ public class ProcessorNode<K, V> {
             sourceNodeForwardSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "forward",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -231,17 +227,16 @@ public class ProcessorNode<K, V> {
 
         private static Sensor createTaskAndNodeLatencyAndThroughputSensors(final String operation,
                                                                            final StreamsMetricsImpl
metrics,
-                                                                           final String group,
                                                                            final String taskName,
                                                                            final String processorNodeName,
                                                                            final Map<String,
String> taskTags,
                                                                            final Map<String,
String> nodeTags) {
             final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
-            addAvgMaxLatency(parent, group, taskTags, operation);
-            addInvocationRateAndCount(parent, group, taskTags, operation);
+            addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+            addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
             final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation,
Sensor.RecordingLevel.DEBUG, parent);
-            addAvgMaxLatency(sensor, group, nodeTags, operation);
-            addInvocationRateAndCount(sensor, group, nodeTags, operation);
+            addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+            addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
             return sensor;
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 1703112..8ec2711 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -51,6 +51,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     private static final String SENSOR_PREFIX_DELIMITER = ".";
     private static final String SENSOR_NAME_DELIMITER = ".s.";
 
+    public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
+    public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
+
     public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
         this.threadName = threadName;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index d94f671..234ea05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -17,17 +17,20 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.metrics.Sensors;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -57,6 +60,8 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
     private long minTimestamp = Long.MAX_VALUE;
     private RecordCollector collector;
     private String changelogTopic;
+    private Sensor bufferSizeSensor;
+    private Sensor bufferCountSensor;
 
     private volatile boolean open;
 
@@ -174,11 +179,16 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
+        final InternalProcessorContext internalProcessorContext = (InternalProcessorContext)
context;
+        bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext);
+        bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext);
+
         context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
         if (loggingEnabled) {
             collector = ((RecordCollector.Supplier) context).recordCollector();
             changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(),
storeName);
         }
+        updateBufferMetrics();
         open = true;
     }
 
@@ -189,12 +199,13 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
 
     @Override
     public void close() {
+        open = false;
         index.clear();
         sortedMap.clear();
         dirtyKeys.clear();
         memBufferSize = 0;
         minTimestamp = Long.MAX_VALUE;
-        open = false;
+        updateBufferMetrics();
     }
 
     @Override
@@ -265,6 +276,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
                 );
             }
         }
+        updateBufferMetrics();
     }
 
 
@@ -272,6 +284,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<KeyValue<Bytes, ContextualRecord>>
callback) {
         final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
+        int evictions = 0;
 
         if (predicate.get()) {
             Map.Entry<BufferKey, ContextualRecord> next = null;
@@ -298,8 +311,13 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
                     next = null;
                     minTimestamp = Long.MAX_VALUE;
                 }
+
+                evictions++;
             }
         }
+        if (evictions > 0) {
+            updateBufferMetrics();
+        }
     }
 
     @Override
@@ -308,6 +326,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
                     final ContextualRecord value) {
         cleanPut(time, key, value);
         dirtyKeys.add(key);
+        updateBufferMetrics();
     }
 
     private void cleanPut(final long time, final Bytes key, final ContextualRecord value)
{
@@ -355,4 +374,9 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
         }
         return size;
     }
+
+    private void updateBufferMetrics() {
+        bufferSizeSensor.record(memBufferSize);
+        bufferCountSensor.record(index.size());
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
index fdbc7c8..13a39c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
@@ -16,7 +16,13 @@
  */
 package org.apache.kafka.streams.state.internals.metrics;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import java.util.Map;
@@ -25,7 +31,6 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
 
 public final class Sensors {
-    
     private Sensors() {}
 
     public static Sensor createTaskAndStoreLatencyAndThroughputSensors(final Sensor.RecordingLevel
level,
@@ -44,5 +49,67 @@ public final class Sensors {
         addInvocationRateAndCount(sensor, metricsGroup, storeTags, operation);
         return sensor;
     }
+
+    public static Sensor createBufferSizeSensor(final StateStore store,
+                                                final InternalProcessorContext context) {
+        return getBufferSizeOrCountSensor(store, context, "size");
+    }
+
+    public static Sensor createBufferCountSensor(final StateStore store,
+                                                 final InternalProcessorContext context)
{
+        return getBufferSizeOrCountSensor(store, context, "count");
+    }
+
+    private static Sensor getBufferSizeOrCountSensor(final StateStore store,
+                                                     final InternalProcessorContext context,
+                                                     final String property) {
+        final StreamsMetricsImpl metrics = context.metrics();
+
+        final String sensorName = "suppression-buffer-" + property;
+
+        final Sensor sensor = metrics.storeLevelSensor(
+            context.taskId().toString(),
+            store.name(),
+            sensorName,
+            Sensor.RecordingLevel.DEBUG
+        );
+
+        final String metricsGroup = "stream-buffer-metrics";
+
+        final Map<String, String> tags = metrics.tagMap(
+            "task-id", context.taskId().toString(),
+            "buffer-id", store.name()
+        );
+
+        sensor.add(
+            new MetricName(
+                sensorName + "-current",
+                metricsGroup,
+                "The current " + property + " of buffered records.",
+                tags),
+            new Value()
+        );
+
+
+        sensor.add(
+            new MetricName(
+                sensorName + "-avg",
+                metricsGroup,
+                "The average " + property + " of buffered records.",
+                tags),
+            new Avg()
+        );
+
+        sensor.add(
+            new MetricName(
+                sensorName + "-max",
+                metricsGroup,
+                "The max " + property + " of buffered records.",
+                tags),
+            new Max()
+        );
+
+        return sensor;
+    }
 }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
new file mode 100644
index 0000000..986dc6f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
+import org.apache.kafka.test.MockInternalProcessorContext;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static org.apache.kafka.common.serialization.Serdes.Long;
+import static org.apache.kafka.common.serialization.Serdes.String;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.is;
+
+@SuppressWarnings("PointlessArithmeticExpression")
+public class KTableSuppressProcessorMetricsTest {
+    private static final long ARBITRARY_LONG = 5L;
+
+    private static final MetricName EVICTION_TOTAL_METRIC = new MetricName(
+        "suppression-emit-total",
+        "stream-processor-node-metrics",
+        "The total number of occurrence of suppression-emit operations.",
+        mkMap(
+            mkEntry("client-id", "mock-processor-context-virtual-thread"),
+            mkEntry("task-id", "0_0"),
+            mkEntry("processor-node-id", "testNode")
+        )
+    );
+
+    private static final MetricName EVICTION_RATE_METRIC = new MetricName(
+        "suppression-emit-rate",
+        "stream-processor-node-metrics",
+        "The average number of occurrence of suppression-emit operation per second.",
+        mkMap(
+            mkEntry("client-id", "mock-processor-context-virtual-thread"),
+            mkEntry("task-id", "0_0"),
+            mkEntry("processor-node-id", "testNode")
+        )
+    );
+
+    private static final MetricName BUFFER_SIZE_AVG_METRIC = new MetricName(
+        "suppression-buffer-size-avg",
+        "stream-buffer-metrics",
+        "The average size of buffered records.",
+        mkMap(
+            mkEntry("client-id", "mock-processor-context-virtual-thread"),
+            mkEntry("task-id", "0_0"),
+            mkEntry("buffer-id", "test-store")
+        )
+    );
+
+    private static final MetricName BUFFER_SIZE_CURRENT_METRIC = new MetricName(
+        "suppression-buffer-size-current",
+        "stream-buffer-metrics",
+        "The current size of buffered records.",
+        mkMap(
+            mkEntry("client-id", "mock-processor-context-virtual-thread"),
+            mkEntry("task-id", "0_0"),
+            mkEntry("buffer-id", "test-store")
+        )
+    );
+
+    private static final MetricName BUFFER_SIZE_MAX_METRIC = new MetricName(
+        "suppression-buffer-size-max",
+        "stream-buffer-metrics",
+        "The max size of buffered records.",
+        mkMap(
+            mkEntry("client-id", "mock-processor-context-virtual-thread"),
+            mkEntry("task-id", "0_0"),
+            mkEntry("buffer-id", "test-store")
+        )
+    );
+
+    private static final MetricName BUFFER_COUNT_AVG_METRIC = new MetricName(
+        "suppression-buffer-count-avg",
+        "stream-buffer-metrics",
+        "The average count of buffered records.",
+        mkMap(
+            mkEntry("client-id", "mock-processor-context-virtual-thread"),
+            mkEntry("task-id", "0_0"),
+            mkEntry("buffer-id", "test-store")
+        )
+    );
+
+    private static final MetricName BUFFER_COUNT_CURRENT_METRIC = new MetricName(
+        "suppression-buffer-count-current",
+        "stream-buffer-metrics",
+        "The current count of buffered records.",
+        mkMap(
+            mkEntry("client-id", "mock-processor-context-virtual-thread"),
+            mkEntry("task-id", "0_0"),
+            mkEntry("buffer-id", "test-store")
+        )
+    );
+
+    private static final MetricName BUFFER_COUNT_MAX_METRIC = new MetricName(
+        "suppression-buffer-count-max",
+        "stream-buffer-metrics",
+        "The max count of buffered records.",
+        mkMap(
+            mkEntry("client-id", "mock-processor-context-virtual-thread"),
+            mkEntry("task-id", "0_0"),
+            mkEntry("buffer-id", "test-store")
+        )
+    );
+
+    @Test
+    public void shouldRecordMetrics() {
+        final String storeName = "test-store";
+
+        final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
+            .withLoggingDisabled()
+            .build();
+
+        final KTableSuppressProcessor<String, Long> processor =
+            new KTableSuppressProcessor<>(
+                (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100),
maxRecords(1)),
+                storeName,
+                String(),
+                new FullChangeSerde<>(Long())
+            );
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        context.setCurrentNode(new ProcessorNode("testNode"));
+
+        buffer.init(context, buffer);
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        final String key = "longKey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        {
+            final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+
+            verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0));
+            verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0));
+            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(25.5));
+            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(51.0));
+            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(51.0));
+            verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5));
+            verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
+            verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0));
+        }
+
+        context.setStreamTime(timestamp + 1);
+        context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+        processor.process("key", value);
+
+        {
+            final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+
+            verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0));
+            verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0));
+            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(49.0));
+            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(47.0));
+            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(98.0));
+            verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0));
+            verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
+            verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> void verifyMetric(final Map<MetricName, ? extends Metric> metrics,
+                                  final MetricName metricName,
+                                  final Matcher<T> matcher) {
+        assertThat(metrics.get(metricName).metricName().description(), is(metricName.description()));
+        assertThat((T) metrics.get(metricName).metricValue(), matcher);
+
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 002ace2..335fae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -77,9 +77,16 @@ public class KTableSuppressProcessorTest {
                 .withLoggingDisabled()
                 .build();
             final KTableSuppressProcessor<K, V> processor =
-                new KTableSuppressProcessor<>(getImpl(suppressed), storeName, keySerde,
new FullChangeSerde<>(valueSerde));
+                new KTableSuppressProcessor<>(
+                    (SuppressedInternal<K>) suppressed,
+                    storeName,
+                    keySerde,
+                    new FullChangeSerde<>(valueSerde)
+                );
 
             final MockInternalProcessorContext context = new MockInternalProcessorContext();
+            context.setCurrentNode(new ProcessorNode("testNode"));
+
             buffer.init(context, buffer);
             processor.init(context);
 
@@ -461,10 +468,6 @@ public class KTableSuppressProcessorTest {
         };
     }
 
-    private static <K> SuppressedInternal<K> getImpl(final Suppressed<K>
suppressed) {
-        return (SuppressedInternal<K>) suppressed;
-    }
-
     private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K>
rawType, final long windowSize) {
         final Serde<K> kSerde = Serdes.serdeFrom(rawType);
         return new Serdes.WrapperSerde<>(
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 88a7fe7..4c3a6b2 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.processor;
 import java.time.Duration;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.KeyValue;
@@ -208,7 +210,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         this.taskId = taskId;
         this.config = streamsConfig;
         this.stateDir = stateDir;
-        this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context-virtual-thread");
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        this.metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            "mock-processor-context-virtual-thread"
+        );
     }
 
     @Override


Mime
View raw message