From commits-return-10548-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Fri Oct 12 16:13:05 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7634B19D48 for ; Fri, 12 Oct 2018 16:13:05 +0000 (UTC) Received: (qmail 92792 invoked by uid 500); 12 Oct 2018 16:13:05 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 92571 invoked by uid 500); 12 Oct 2018 16:13:05 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 92562 invoked by uid 99); 12 Oct 2018 16:13:05 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Oct 2018 16:13:05 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7C709850C0; Fri, 12 Oct 2018 16:13:04 +0000 (UTC) Date: Fri, 12 Oct 2018 16:13:03 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-7223: Add late-record metrics (#5742) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153936078306.2167.563396557529863588@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: f393b2f7dd477c3a43e70631f7036a211bf5d740 X-Git-Newrev: 21f88a595b5785d8b9b9a17a3fb667b1712f3b61 X-Git-Rev: 21f88a595b5785d8b9b9a17a3fb667b1712f3b61 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 21f88a5 KAFKA-7223: Add late-record metrics (#5742) 21f88a5 is described below commit 21f88a595b5785d8b9b9a17a3fb667b1712f3b61 Author: John Roesler AuthorDate: Fri Oct 12 11:12:51 2018 -0500 KAFKA-7223: Add late-record metrics (#5742) Add late record metrics, as specified in KIP-328 Reviewers: Bill Bejeck , Guozhang Wang --- build.gradle | 1 + gradle/dependencies.gradle | 2 + .../streams/kstream/internals/metrics/Sensors.java | 36 +++++ .../processor/internals/PartitionGroup.java | 12 +- .../streams/processor/internals/StreamTask.java | 3 +- ...KStreamSessionWindowAggregateProcessorTest.java | 29 +++- .../internals/KStreamWindowAggregateTest.java | 170 +++++++++++++-------- .../processor/internals/PartitionGroupTest.java | 103 +++++++------ .../org/apache/kafka/test/StreamsTestUtils.java | 2 + .../apache/kafka/streams/TopologyTestDriver.java | 10 +- 10 files changed, 248 insertions(+), 120 deletions(-) diff --git a/build.gradle b/build.gradle index 95f3eb3..e78d2ce 100644 --- a/build.gradle +++ b/build.gradle @@ -974,6 +974,7 @@ project(':streams') { testCompile libs.junit testCompile libs.easymock testCompile libs.bcpkix + testCompile libs.hamcrest testRuntimeOnly project(':streams:test-utils') testRuntime libs.slf4jlog4j diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index e22885e..e11ded1 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -57,6 +57,7 @@ versions += [ jetty: "9.4.12.v20180830", jersey: "2.27", jmh: "1.21", + hamcrest: "1.3", log4j: "1.2.17", scalaLogging: "3.9.0", jaxb: "2.3.0", @@ -117,6 +118,7 @@ libs += [ jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh", joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", junit: "junit:junit:$versions.junit", + hamcrest: "org.hamcrest:hamcrest-all:1.3", kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100", kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101", kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102", diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java index 04c7150..a85bbb8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.streams.kstream.internals.metrics; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import java.util.Map; + public class Sensors { private Sensors() {} @@ -39,4 +44,35 @@ public class Sensors { ); return sensor; } + + public static Sensor recordLatenessSensor(final InternalProcessorContext context) { + final StreamsMetricsImpl metrics = context.metrics(); + + final Sensor sensor = metrics.taskLevelSensor( + context.taskId().toString(), + "record-lateness", + Sensor.RecordingLevel.DEBUG + ); + + final Map tags = metrics.tagMap( + "task-id", context.taskId().toString() + ); + sensor.add( + new MetricName( + "record-lateness-avg", + "stream-processor-node-metrics", + "The average observed lateness of records.", + tags), + new Avg() + ); + sensor.add( + new MetricName( + "record-lateness-max", + "stream-processor-node-metrics", + "The max observed lateness of records.", + tags), + new Max() + ); + return sensor; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 7020253..1fdd454 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import java.util.Collections; import java.util.Comparator; @@ -38,6 +39,7 @@ import java.util.Set; public class PartitionGroup { private final Map partitionQueues; + private final Sensor recordLatenessSensor; private final PriorityQueue nonEmptyQueuesByTime; private long streamTime; @@ -61,9 +63,10 @@ public class PartitionGroup { } } - PartitionGroup(final Map partitionQueues) { + PartitionGroup(final Map partitionQueues, final Sensor recordLatenessSensor) { nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); this.partitionQueues = partitionQueues; + this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; @@ -95,7 +98,12 @@ public class PartitionGroup { } // always update the stream time to the record's timestamp yet to be processed if it is larger - streamTime = Math.max(streamTime, record.timestamp); + if (record.timestamp > streamTime) { + streamTime = record.timestamp; + recordLatenessSensor.record(0); + } else { + recordLatenessSensor.record(streamTime - record.timestamp); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 2ad0acc..247a156 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; import static java.lang.String.format; import static java.util.Collections.singleton; +import static org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordLatenessSensor; /** * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing. @@ -234,7 +235,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } recordInfo = new PartitionGroup.RecordInfo(); - partitionGroup = new PartitionGroup(partitionQueues); + partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl)); processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp); stateMgr.registerGlobalStateStores(topology.globalStateStores()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 419c861..1074f02f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; @@ -55,7 +54,9 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -110,7 +111,7 @@ public class KStreamSessionWindowAggregateProcessorTest { final StoreBuilder> storeBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)), Serdes.String(), Serdes.Long()) - .withLoggingDisabled(); + .withLoggingDisabled(); if (enableCaching) { storeBuilder.withCachingEnabled(); @@ -335,9 +336,11 @@ public class KStreamSessionWindowAggregateProcessorTest { context.setStreamTime(20); context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); processor.process("A", "1"); + context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); + processor.process("A", "1"); LogCaptureAppender.unregister(appender); - final Metric dropMetric = metrics.metrics().get(new MetricName( + final MetricName dropMetric = new MetricName( "late-record-drop-total", "stream-processor-node-metrics", "The total number of occurrence of late-record-drop operations.", @@ -346,8 +349,24 @@ public class KStreamSessionWindowAggregateProcessorTest { mkEntry("task-id", "0_0"), mkEntry("processor-node-id", "TESTING_NODE") ) - )); - assertEquals(1.0, dropMetric.metricValue()); + ); + + assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0)); + + final MetricName dropRate = new MetricName( + "late-record-drop-rate", + "stream-processor-node-metrics", + "The average number of occurrence of late-record-drop operations.", + mkMap( + mkEntry("client-id", "test"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "TESTING_NODE") + ) + ); + + assertThat((Double) metrics.metrics().get(dropRate).metricValue(), greaterThan(0.0)); + assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]")); + assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]")); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 8ae6284..236cd8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -42,6 +43,7 @@ import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.StreamsTestUtils; +import org.hamcrest.Matcher; import org.junit.Test; import java.util.List; @@ -51,9 +53,10 @@ import static java.time.Duration.ofMillis; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -70,7 +73,7 @@ public class KStreamWindowAggregateTest { final KTable, String> table2 = builder .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5))) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.>as("topic1-Canonized").withValueSerde(Serdes.String())); @@ -128,7 +131,7 @@ public class KStreamWindowAggregateTest { final KTable, String> table1 = builder .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5))) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.>as("topic1-Canonized").withValueSerde(Serdes.String())); @@ -137,7 +140,7 @@ public class KStreamWindowAggregateTest { final KTable, String> table2 = builder .stream(topic2, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5))) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.>as("topic2-Canonized").withValueSerde(Serdes.String())); @@ -231,8 +234,9 @@ public class KStreamWindowAggregateTest { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; - final KStream stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); - stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5))) .aggregate( MockInitializer.STRING_INIT, @@ -258,15 +262,15 @@ public class KStreamWindowAggregateTest { final KStream stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100)) - .aggregate( - () -> "", - MockAggregator.toStringInstance("+"), - Materialized.>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() - ) - .toStream() - .map((key, value) -> new KeyValue<>(key.toString(), value)) - .to("output"); + .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100)) + .aggregate( + () -> "", + MockAggregator.toStringInstance("+"), + Materialized.>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() + ) + .toStream() + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("output"); LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); @@ -281,17 +285,13 @@ public class KStreamWindowAggregateTest { driver.pipeInput(recordFactory.create(topic, "k", "6", 6L)); LogCaptureAppender.unregister(appender); - final MetricName metricName = new MetricName( - "late-record-drop-total", - "stream-processor-node-metrics", - "The total number of occurrence of late-record-drop operations.", - mkMap( - mkEntry("client-id", "topology-test-driver-virtual-thread"), - mkEntry("task-id", "0_0"), - mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001") - ) + assertLatenessMetrics( + driver, + is(7.0), // how many events get dropped + is(100.0), // k:0 is 100ms late, since its time is 0, but it arrives at stream time 100. + is(84.875) // (0 + 100 + 99 + 98 + 97 + 96 + 95 + 94) / 8 ); - assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0)); + assertThat(appender.getMessages(), hasItems( "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]", @@ -316,59 +316,101 @@ public class KStreamWindowAggregateTest { final String topic = "topic"; final KStream stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); - stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90L))) - .aggregate( - () -> "", - MockAggregator.toStringInstance("+"), - Materialized.>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() - ) - .toStream() - .map((key, value) -> new KeyValue<>(key.toString(), value)) - .to("output"); + stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(10)).grace(ofMillis(90L))) + .aggregate( + () -> "", + MockAggregator.toStringInstance("+"), + Materialized.>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() + ) + .toStream() + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("output"); LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { - driver.pipeInput(recordFactory.create(topic, "k", "100", 100L)); - driver.pipeInput(recordFactory.create(topic, "k", "0", 0L)); - driver.pipeInput(recordFactory.create(topic, "k", "1", 1L)); - driver.pipeInput(recordFactory.create(topic, "k", "2", 2L)); - driver.pipeInput(recordFactory.create(topic, "k", "3", 3L)); - driver.pipeInput(recordFactory.create(topic, "k", "4", 4L)); - driver.pipeInput(recordFactory.create(topic, "k", "5", 5L)); + driver.pipeInput(recordFactory.create(topic, "k", "100", 200L)); + driver.pipeInput(recordFactory.create(topic, "k", "0", 100L)); + driver.pipeInput(recordFactory.create(topic, "k", "1", 101L)); + driver.pipeInput(recordFactory.create(topic, "k", "2", 102L)); + driver.pipeInput(recordFactory.create(topic, "k", "3", 103L)); + driver.pipeInput(recordFactory.create(topic, "k", "4", 104L)); + driver.pipeInput(recordFactory.create(topic, "k", "5", 105L)); driver.pipeInput(recordFactory.create(topic, "k", "6", 6L)); LogCaptureAppender.unregister(appender); - final MetricName metricName = new MetricName( - "late-record-drop-total", - "stream-processor-node-metrics", - "The total number of occurrence of late-record-drop operations.", - mkMap( - mkEntry("client-id", "topology-test-driver-virtual-thread"), - mkEntry("task-id", "0_0"), - mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001") - ) - ); - assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0)); + assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375)); + assertThat(appender.getMessages(), hasItems( - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]" + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]" )); - OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100); - OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/110]", "+100", 100); - OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5", 5); - OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5+6", 6); + OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100", 200); assertThat(driver.readOutput("output"), nullValue()); } } + private void assertLatenessMetrics(final TopologyTestDriver driver, + final Matcher dropTotal, + final Matcher maxLateness, + final Matcher avgLateness) { + final MetricName dropMetric = new MetricName( + "late-record-drop-total", + "stream-processor-node-metrics", + "The total number of occurrence of late-record-drop operations.", + mkMap( + mkEntry("client-id", "topology-test-driver-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001") + ) + ); + + assertThat(driver.metrics().get(dropMetric).metricValue(), dropTotal); + + + final MetricName dropRate = new MetricName( + "late-record-drop-rate", + "stream-processor-node-metrics", + "The average number of occurrence of late-record-drop operations.", + mkMap( + mkEntry("client-id", "topology-test-driver-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001") + ) + ); + + assertThat(driver.metrics().get(dropRate).metricValue(), not(0.0)); + + final MetricName latenessMaxMetric = new MetricName( + "record-lateness-max", + "stream-processor-node-metrics", + "The max observed lateness of records.", + mkMap( + mkEntry("client-id", "topology-test-driver-virtual-thread"), + mkEntry("task-id", "0_0") + ) + ); + assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness); + + final MetricName latenessAvgMetric = new MetricName( + "record-lateness-avg", + "stream-processor-node-metrics", + "The average observed lateness of records.", + mkMap( + mkEntry("client-id", "topology-test-driver-virtual-thread"), + mkEntry("task-id", "0_0") + ) + ); + assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness); + } + private ProducerRecord getOutput(final TopologyTestDriver driver) { return driver.readOutput("output", new StringDeserializer(), new StringDeserializer()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 2df4f66..c84bbc2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -17,7 +17,11 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -65,7 +69,19 @@ public class PartitionGroupTest { private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); - private final PartitionGroup group = new PartitionGroup(mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2))); + private final Metrics metrics = new Metrics(); + private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap()); + + private final PartitionGroup group = new PartitionGroup( + mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)), + getValueSensor(metrics, lastLatenessValue) + ); + + private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) { + final Sensor lastRecordedValue = metrics.sensor(metricName.name()); + lastRecordedValue.add(metricName, new Value()); + return lastRecordedValue; + } @Test public void testTimeTracking() { @@ -90,10 +106,9 @@ public class PartitionGroupTest { // 2:[2, 4, 6] // st: -1 since no records was being processed yet - assertEquals(6, group.numBuffered()); - assertEquals(3, group.numBuffered(partition1)); - assertEquals(3, group.numBuffered(partition2)); + verifyBuffered(6, 3, 3); assertEquals(-1L, group.timestamp()); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); @@ -104,11 +119,9 @@ public class PartitionGroupTest { // 2:[2, 4, 6] // st: 2 assertEquals(partition1, info.partition()); - assertEquals(1L, record.timestamp); - assertEquals(5, group.numBuffered()); - assertEquals(2, group.numBuffered(partition1)); - assertEquals(3, group.numBuffered(partition2)); - assertEquals(1L, group.timestamp()); + verifyTimes(record, 1L, 1L); + verifyBuffered(5, 2, 3); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, now the time should be advanced record = group.nextRecord(info); @@ -116,11 +129,9 @@ public class PartitionGroupTest { // 2:[4, 6] // st: 3 assertEquals(partition2, info.partition()); - assertEquals(2L, record.timestamp); - assertEquals(4, group.numBuffered()); - assertEquals(2, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); - assertEquals(2L, group.timestamp()); + verifyTimes(record, 2L, 2L); + verifyBuffered(4, 2, 2); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // add 2 more records with timestamp 2, 4 to partition-1 final List> list3 = Arrays.asList( @@ -131,10 +142,9 @@ public class PartitionGroupTest { // 1:[3, 5, 2, 4] // 2:[4, 6] // st: 3 (non-decreasing, so adding 2 doesn't change it) - assertEquals(6, group.numBuffered()); - assertEquals(4, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); + verifyBuffered(6, 4, 2); assertEquals(2L, group.timestamp()); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, time should not be advanced record = group.nextRecord(info); @@ -142,11 +152,9 @@ public class PartitionGroupTest { // 2:[4, 6] // st: 4 as partition st is now {5, 4} assertEquals(partition1, info.partition()); - assertEquals(3L, record.timestamp); - assertEquals(5, group.numBuffered()); - assertEquals(3, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + verifyTimes(record, 3L, 3L); + verifyBuffered(5, 3, 2); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, time should not be advanced record = group.nextRecord(info); @@ -154,11 +162,9 @@ public class PartitionGroupTest { // 2:[6] // st: 5 as partition st is now {5, 6} assertEquals(partition2, info.partition()); - assertEquals(4L, record.timestamp); - assertEquals(4, group.numBuffered()); - assertEquals(3, group.numBuffered(partition1)); - assertEquals(1, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + verifyTimes(record, 4L, 4L); + verifyBuffered(4, 3, 1); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one more record, now time should be advanced record = group.nextRecord(info); @@ -166,11 +172,9 @@ public class PartitionGroupTest { // 2:[6] // st: 5 assertEquals(partition1, info.partition()); - assertEquals(5L, record.timestamp); - assertEquals(3, group.numBuffered()); - assertEquals(2, group.numBuffered(partition1)); - assertEquals(1, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + verifyTimes(record, 5L, 5L); + verifyBuffered(3, 2, 1); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -178,11 +182,9 @@ public class PartitionGroupTest { // 2:[6] // st: 5 assertEquals(partition1, info.partition()); - assertEquals(2L, record.timestamp); - assertEquals(2, group.numBuffered()); - assertEquals(1, group.numBuffered(partition1)); - assertEquals(1, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + verifyTimes(record, 2L, 5L); + verifyBuffered(2, 1, 1); + assertEquals(3.0, metrics.metric(lastLatenessValue).metricValue()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -190,11 +192,9 @@ public class PartitionGroupTest { // 2:[6] // st: 4 (doesn't advance because 1 is empty, so it's still reporting the last-known time of 4) assertEquals(partition1, info.partition()); - assertEquals(4L, record.timestamp); - assertEquals(1, group.numBuffered()); - assertEquals(0, group.numBuffered(partition1)); - assertEquals(1, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + verifyTimes(record, 4L, 5L); + verifyBuffered(1, 0, 1); + assertEquals(1.0, metrics.metric(lastLatenessValue).metricValue()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -202,11 +202,20 @@ public class PartitionGroupTest { // 2:[] // st: 4 (1 and 2 are empty, so they are still reporting the last-known times of 4 and 6.) assertEquals(partition2, info.partition()); - assertEquals(6L, record.timestamp); - assertEquals(0, group.numBuffered()); - assertEquals(0, group.numBuffered(partition1)); - assertEquals(0, group.numBuffered(partition2)); - assertEquals(6L, group.timestamp()); + verifyTimes(record, 6L, 6L); + verifyBuffered(0, 0, 0); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); + + } + + private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) { + assertEquals(recordTime, record.timestamp); + assertEquals(streamTime, group.timestamp()); + } + private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) { + assertEquals(totalBuffered, group.numBuffered()); + assertEquals(partitionOneBuffered, group.numBuffered(partition1)); + assertEquals(partitionTwoBuffered, group.numBuffered(partition2)); } } diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 1d64316..1dcebb5 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; +import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -50,6 +51,7 @@ public final class StreamsTestUtils { props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName); props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, DEBUG.name); props.putAll(additional); return props; } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index d10a45c..2abfd63 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -29,7 +29,9 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; @@ -255,7 +257,13 @@ public class TopologyTestDriver implements Closeable { final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime); - metrics = new Metrics(); + + final MetricConfig metricConfig = new MetricConfig() + .samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); + + metrics = new Metrics(metricConfig, mockWallClockTime); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, "topology-test-driver-virtual-thread"