kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6671)
Date Sat, 04 May 2019 11:26:35 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new c6acaaa  KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6671)
c6acaaa is described below

commit c6acaaa469b20c385a7b5dbdcbb7a810a24881ab
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Sat May 4 06:26:10 2019 -0500

    KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6671)
    
    Session windows expired prematurely (off-by-one error), since the window end is inclusive,
unlike other windows
    Suppress duration for sessions incorrectly waited only the grace period, but session windows
aren't closed until gracePeriod + sessionGap
    
    cherry-pick of 6654 from trunk
    
    Reviewers: Bill Bejeck <bill@confluent.io>
---
 .../internals/KStreamSessionWindowAggregate.java   | 28 +++----
 .../internals/graph/GraphGraceSearchUtil.java      |  2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java | 96 ++++++++++++++++++++--
 .../kstream/internals/SuppressScenarioTest.java    | 33 ++++----
 .../internals/graph/GraphGraceSearchUtilTest.java  | 10 +--
 5 files changed, 125 insertions(+), 44 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 707ad91..368e2c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -134,19 +134,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements
KStreamAggProce
                 }
             }
 
-            if (mergedWindow.end() > closeTime) {
-                if (!mergedWindow.equals(newSessionWindow)) {
-                    for (final KeyValue<Windowed<K>, Agg> session : merged) {
-                        store.remove(session.key);
-                        tupleForwarder.maybeForward(session.key, null, session.value);
-                    }
-                }
-
-                agg = aggregator.apply(key, value, agg);
-                final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
-                store.put(sessionKey, agg);
-                tupleForwarder.maybeForward(sessionKey, agg, null);
-            } else {
+            if (mergedWindow.end() < closeTime) {
                 LOG.debug(
                     "Skipping record for expired window. " +
                         "key=[{}] " +
@@ -154,7 +142,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements
KStreamAggProce
                         "partition=[{}] " +
                         "offset=[{}] " +
                         "timestamp=[{}] " +
-                        "window=[{},{}) " +
+                        "window=[{},{}] " +
                         "expiration=[{}] " +
                         "streamTime=[{}]",
                     key,
@@ -168,6 +156,18 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements
KStreamAggProce
                     observedStreamTime
                 );
                 lateRecordDropSensor.record();
+            } else {
+                if (!mergedWindow.equals(newSessionWindow)) {
+                    for (final KeyValue<Windowed<K>, Agg> session : merged) {
+                        store.remove(session.key);
+                        tupleForwarder.maybeForward(session.key, null, session.value);
+                    }
+                }
+
+                agg = aggregator.apply(key, value, agg);
+                final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
+                store.put(sessionKey, agg);
+                tupleForwarder.maybeForward(sessionKey, agg, null);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
index 306ddf5..2fb28dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -78,7 +78,7 @@ public final class GraphGraceSearchUtil {
             } else if (processorSupplier instanceof KStreamSessionWindowAggregate) {
                 final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate)
processorSupplier;
                 final SessionWindows windows = kStreamSessionWindowAggregate.windows();
-                return windows.gracePeriodMs();
+                return windows.gracePeriodMs() + windows.inactivityGap();
             } else {
                 return null;
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index c93dc60..bf61fd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -320,11 +320,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     @Test
-    public void shouldLogAndMeterWhenSkippingLateRecord() {
+    public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
         LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
-            SessionWindows.with(ofMillis(10L)).grace(ofMillis(10L)),
+            SessionWindows.with(ofMillis(10L)).grace(ofMillis(0L)),
             STORE_NAME,
             initializer,
             aggregator,
@@ -334,14 +334,21 @@ public class KStreamSessionWindowAggregateProcessorTest {
         initStore(false);
         processor.init(context);
 
-        // dummy record to advance stream time
-        context.setRecordContext(new ProcessorRecordContext(20, -2, -3, "topic", null));
+        // dummy record to establish stream time = 0
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
         processor.process("dummy", "dummy");
 
+        // record arrives on time, should not be skipped
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
-        processor.process("A", "1");
+        processor.process("OnTime1", "1");
+
+        // dummy record to advance stream time = 1
         context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
-        processor.process("A", "1");
+        processor.process("dummy", "dummy");
+
+        // record is late
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("Late1", "1");
         LogCaptureAppender.unregister(appender);
 
         final MetricName dropMetric = new MetricName(
@@ -355,7 +362,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
             )
         );
 
-        assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0));
+        assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0));
 
         final MetricName dropRate = new MetricName(
             "late-record-drop-rate",
@@ -373,9 +380,80 @@ public class KStreamSessionWindowAggregateProcessorTest {
             greaterThan(0.0));
         assertThat(
             appender.getMessages(),
-            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3]
offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
+            hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3]
offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]"));
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
+        LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
+            SessionWindows.with(ofMillis(10L)).grace(ofMillis(1L)),
+            STORE_NAME,
+            initializer,
+            aggregator,
+            sessionMerger
+        ).get();
+
+        initStore(false);
+        processor.init(context);
+
+        // dummy record to establish stream time = 0
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("dummy", "dummy");
+
+        // record arrives on time, should not be skipped
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("OnTime1", "1");
+
+        // dummy record to advance stream time = 1
+        context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
+        processor.process("dummy", "dummy");
+
+        // delayed record arrives on time, should not be skipped
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("OnTime2", "1");
+
+        // dummy record to advance stream time = 2
+        context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
+        processor.process("dummy", "dummy");
+
+        // delayed record arrives late
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("Late1", "1");
+
+
+        LogCaptureAppender.unregister(appender);
+
+        final MetricName dropMetric = new MetricName(
+            "late-record-drop-total",
+            "stream-processor-node-metrics",
+            "The total number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "test"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "TESTING_NODE")
+            )
+        );
+
+        assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0));
+
+        final MetricName dropRate = new MetricName(
+            "late-record-drop-rate",
+            "stream-processor-node-metrics",
+            "The average number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "test"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "TESTING_NODE")
+            )
+        );
+
+        assertThat(
+            (Double) metrics.metrics().get(dropRate).metricValue(),
+            greaterThan(0.0));
         assertThat(
             appender.getMessages(),
-            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3]
offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
+            hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3]
offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 65c51fc..361677d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -46,7 +46,6 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
 import org.junit.Test;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -310,7 +309,7 @@ public class SuppressScenarioTest {
             .count();
         valueCounts
             // this is a bit brittle, but I happen to know that the entries are a little
over 100 bytes in size.
-            .suppress(untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
+            .suppress(untilTimeLimit(ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
             .toStream()
             .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
         valueCounts
@@ -481,7 +480,7 @@ public class SuppressScenarioTest {
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
-            .windowedBy(SessionWindows.with(5L).grace(ofMillis(5L)))
+            .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(0L)))
             .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("counts").withCachingDisabled());
         valueCounts
             .suppress(untilWindowCloses(unbounded()))
@@ -502,34 +501,38 @@ public class SuppressScenarioTest {
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config))
{
             // first window
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
+            // arbitrarily disordered records are admitted, because the *window* is not closed
until stream-time > window-end + grace
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
-            // new window
-            driver.pipeInput(recordFactory.create("input", "k1", "v1", 7L));
+            // any record in the same partition advances stream time (note the key is different)
+            driver.pipeInput(recordFactory.create("input", "k2", "v1", 6L));
             // late event for first window - this should get dropped from all streams, since
the first window is now closed.
-            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
             // just pushing stream time forward to flush the other events through.
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L));
             verify(
                 drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
                     new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L),
-                    new KeyValueTimestamp<>("[k1@0/0]", null, 1L),
-                    new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
-                    new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L),
+                    new KeyValueTimestamp<>("[k1@0/0]", null, 5L),
+                    new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
+                    new KeyValueTimestamp<>("[k1@0/5]", null, 1L),
+                    new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+                    new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
                     new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
                 )
             );
             verify(
                 drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
-                    new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
-                    new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L)
+                    new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+                    new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
                 )
             );
         }
     }
 
-    private <K, V> void verify(final List<ProducerRecord<K, V>> results,
final List<KeyValueTimestamp<K, V>> expectedResults) {
+    private static <K, V> void verify(final List<ProducerRecord<K, V>>
results, final List<KeyValueTimestamp<K, V>> expectedResults) {
         if (results.size() != expectedResults.size()) {
             throw new AssertionError(printRecords(results) + " != " + expectedResults);
         }
@@ -544,7 +547,7 @@ public class SuppressScenarioTest {
         }
     }
 
-    private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final
TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
+    private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final
TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
         final List<ProducerRecord<K, V>> result = new LinkedList<>();
         for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer,
valueDeserializer);
              next != null;
@@ -554,11 +557,11 @@ public class SuppressScenarioTest {
         return new ArrayList<>(result);
     }
 
-    private <K, V> String printRecords(final List<ProducerRecord<K, V>>
result) {
+    private static <K, V> String printRecords(final List<ProducerRecord<K, V>>
result) {
         final StringBuilder resultStr = new StringBuilder();
         resultStr.append("[\n");
         for (final ProducerRecord<?, ?> record : result) {
-            resultStr.append("  ").append(record.toString()).append("\n");
+            resultStr.append("  ").append(record).append("\n");
         }
         resultStr.append("]");
         return resultStr.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 5e426d9..45fe845 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -121,11 +121,11 @@ public class GraphGraceSearchUtilTest {
         );
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs()));
+        assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
     }
 
     @Test
-    public void shouldExtractGraceFromAncestorThroughStatefulParent() {
+    public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
         final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
         final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
             "asdf",
@@ -160,11 +160,11 @@ public class GraphGraceSearchUtilTest {
         statefulParent.addChild(node);
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs()));
+        assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
     }
 
     @Test
-    public void shouldExtractGraceFromAncestorThroughStatelessParent() {
+    public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
         final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
         final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
             "asdf",
@@ -189,7 +189,7 @@ public class GraphGraceSearchUtilTest {
         statelessParent.addChild(node);
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs()));
+        assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
     }
 
     @Test


Mime
View raw message