kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 3.0 updated: HOTIFX: Disable spurious left/outer stream-stream join fix (#11233)
Date Mon, 23 Aug 2021 16:47:42 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 8a81b17  HOTIFX: Disable spurious left/outer stream-stream join fix (#11233)
8a81b17 is described below

commit 8a81b175d9dd694e82804fc77dacff86531af1e7
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Mon Aug 23 09:45:49 2021 -0700

    HOTIFX: Disable spurious left/outer stream-stream join fix (#11233)
    
    KAFKA-10847 improves stream-stream left/outer joins to avoid spurious
    left/outer join results. However, it introduces regression bug
    KAFKA-13216.
    
    This PR disables KAFKA-10847 by partially rolling back KIP-633 changes.
---
 .../apache/kafka/streams/kstream/JoinWindows.java  |  46 -----
 .../apache/kafka/streams/StreamsBuilderTest.java   |  20 +-
 .../org/apache/kafka/streams/TopologyTest.java     |  36 ++--
 .../StreamStreamJoinIntegrationTest.java           |  36 ++--
 .../kafka/streams/kstream/JoinWindowsTest.java     |  22 --
 .../KStreamImplValueJoinerWithKeyTest.java         |   9 +-
 .../kstream/internals/KStreamKStreamJoinTest.java  |  15 +-
 .../internals/KStreamKStreamLeftJoinTest.java      | 151 ++++++--------
 .../internals/KStreamKStreamOuterJoinTest.java     | 223 ++++++++-------------
 .../apache/kafka/streams/scala/TopologyTest.scala  |   8 +-
 .../kafka/streams/scala/kstream/KStreamTest.scala  |   2 +-
 11 files changed, 214 insertions(+), 354 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 1b26bcc..f353739 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -104,55 +104,11 @@ public class JoinWindows extends Windows<Window> {
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
      * the timestamp of the record from the primary stream.
-     * <p>
-     * Using this method explicitly sets the grace period to the duration specified by {@code afterWindowEnd}, which
-     * means that only out-of-order records arriving more than the grace period after the window end will be dropped.
-     * The window close, after which any incoming records are considered late and will be rejected, is defined as
-     * {@code windowEnd + afterWindowEnd}
-     *
-     * @param timeDifference join window interval
-     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
-     * @return A new JoinWindows object with the specified window definition and grace period
-     * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
-     *                                  if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
-     */
-    public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
-        final String timeDifferenceMsgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        final long timeDifferenceMs = validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);
-
-        final String afterWindowEndMsgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
-
-        return new JoinWindows(timeDifferenceMs, timeDifferenceMs, afterWindowEndMs, true);
-    }
-
-    /**
-     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
-     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
-     * the timestamp of the record from the primary stream.
-     * <p>
-     * CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order
-     * records arriving after the window ends are considered late and will be dropped.
-     *
-     * @param timeDifference join window interval
-     * @return a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
-     * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
-     */
-    public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
-        return ofTimeDifferenceAndGrace(timeDifference, Duration.ofMillis(NO_GRACE_PERIOD));
-    }
-
-    /**
-     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
-     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
-     * the timestamp of the record from the primary stream.
      *
      * @param timeDifference join window interval
      * @return a new JoinWindows object with the window definition with and grace period (default to 24 hours minus {@code timeDifference})
      * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0. Use {@link #ofTimeDifferenceWithNoGrace(Duration)}} instead
      */
-    @Deprecated
     public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
@@ -216,9 +172,7 @@ public class JoinWindows extends Windows<Window> {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
      */
-    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
         //TODO KAFKA-13021: disallow calling grace() if it was already set via ofTimeDifferenceAndGrace/WithNoGrace()
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index e18b972..2f0dedd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -745,7 +745,7 @@ public class StreamsBuilderTest {
         streamOne.leftJoin(
             streamTwo,
             (value1, value2) -> value1,
-            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+            JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO),
             StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
                 .withName(STREAM_OPERATION_NAME)
         );
@@ -754,8 +754,7 @@ public class StreamsBuilderTest {
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
         assertNamesForStateStore(topology.stateStores(),
             STREAM_OPERATION_NAME + "-this-join-store",
-            STREAM_OPERATION_NAME + "-outer-other-join-store",
-            STREAM_OPERATION_NAME + "-left-shared-join-store"
+            STREAM_OPERATION_NAME + "-outer-other-join-store"
         );
         assertNamesForOperation(topology,
                                 "KSTREAM-SOURCE-0000000000",
@@ -775,7 +774,7 @@ public class StreamsBuilderTest {
         streamOne.leftJoin(
             streamTwo,
             (value1, value2) -> value1,
-            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+            JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
                 .withName(STREAM_OPERATION_NAME)
         );
@@ -784,8 +783,7 @@ public class StreamsBuilderTest {
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
         assertNamesForStateStore(topology.stateStores(),
                                  "KSTREAM-JOINTHIS-0000000004-store",
-                                 "KSTREAM-OUTEROTHER-0000000005-store",
-                                 "KSTREAM-OUTERSHARED-0000000004-store"
+                                 "KSTREAM-OUTEROTHER-0000000005-store"
         );
         assertNamesForOperation(topology,
                                 "KSTREAM-SOURCE-0000000000",
@@ -851,7 +849,7 @@ public class StreamsBuilderTest {
         streamOne.outerJoin(
             streamTwo,
             (value1, value2) -> value1,
-            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+            JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO),
             StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
                 .withName(STREAM_OPERATION_NAME)
         );
@@ -859,8 +857,7 @@ public class StreamsBuilderTest {
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
         assertNamesForStateStore(topology.stateStores(),
                                  STREAM_OPERATION_NAME + "-outer-this-join-store",
-                                 STREAM_OPERATION_NAME + "-outer-other-join-store",
-                                 STREAM_OPERATION_NAME + "-outer-shared-join-store");
+                                 STREAM_OPERATION_NAME + "-outer-other-join-store");
         assertNamesForOperation(topology,
                                 "KSTREAM-SOURCE-0000000000",
                                 "KSTREAM-SOURCE-0000000001",
@@ -880,7 +877,7 @@ public class StreamsBuilderTest {
         streamOne.outerJoin(
             streamTwo,
             (value1, value2) -> value1,
-            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+            JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
                 .withName(STREAM_OPERATION_NAME)
         );
@@ -889,8 +886,7 @@ public class StreamsBuilderTest {
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
         assertNamesForStateStore(topology.stateStores(),
                                  "KSTREAM-OUTERTHIS-0000000004-store",
-                                 "KSTREAM-OUTEROTHER-0000000005-store",
-                                 "KSTREAM-OUTERSHARED-0000000004-store"
+                                 "KSTREAM-OUTEROTHER-0000000005-store"
         );
         assertNamesForOperation(topology,
                                 "KSTREAM-SOURCE-0000000000",
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index b332f6c..45f7c62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -853,7 +853,7 @@ public class TopologyTest {
         stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+            JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
 
         final TopologyDescription describe = builder.build().describe();
@@ -871,10 +871,10 @@ public class TopologyTest {
                 "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n" +
                 "      --> KSTREAM-OUTEROTHER-0000000005\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
-                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store, KSTREAM-OUTERSHARED-0000000004-store])\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000002\n" +
-                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store, KSTREAM-OUTERSHARED-0000000004-store])\n" +
+                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000003\n" +
                 "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -895,7 +895,7 @@ public class TopologyTest {
         stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+            JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
                 .withStoreName("custom-name"));
 
@@ -914,10 +914,10 @@ public class TopologyTest {
                 "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [custom-name-outer-other-join-store])\n" +
                 "      --> KSTREAM-OUTEROTHER-0000000005\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
-                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [custom-name-outer-other-join-store, custom-name-left-shared-join-store])\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [custom-name-outer-other-join-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000002\n" +
-                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-this-join-store, custom-name-left-shared-join-store])\n" +
+                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-this-join-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000003\n" +
                 "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -935,7 +935,7 @@ public class TopologyTest {
         stream1 = builder.stream("input-topic1");
         stream2 = builder.stream("input-topic2");
 
-        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100));
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ZERO);
 
         final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
             Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
@@ -968,10 +968,10 @@ public class TopologyTest {
                 "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
                 "      --> KSTREAM-OUTEROTHER-0000000005\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
-                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-left-shared-join-store])\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000002\n" +
-                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store, in-memory-join-store-left-shared-join-store])\n" +
+                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000003\n" +
                 "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -992,7 +992,7 @@ public class TopologyTest {
         stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+            JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
 
         final TopologyDescription describe = builder.build().describe();
@@ -1010,10 +1010,10 @@ public class TopologyTest {
                 "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n" +
                 "      --> KSTREAM-OUTEROTHER-0000000005\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
-                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-OUTERTHIS-0000000004-store, KSTREAM-OUTERSHARED-0000000004-store])\n" +
+                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-OUTERTHIS-0000000004-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000003\n" +
-                "    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store, KSTREAM-OUTERSHARED-0000000004-store])\n" +
+                "    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000002\n" +
                 "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -1034,7 +1034,7 @@ public class TopologyTest {
         stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+            JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
                 .withStoreName("custom-name"));
 
@@ -1053,10 +1053,10 @@ public class TopologyTest {
                 "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [custom-name-outer-other-join-store])\n" +
                 "      --> KSTREAM-OUTEROTHER-0000000005\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
-                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-outer-this-join-store, custom-name-outer-shared-join-store])\n" +
+                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-outer-this-join-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000003\n" +
-                "    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [custom-name-outer-other-join-store, custom-name-outer-shared-join-store])\n" +
+                "    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [custom-name-outer-other-join-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000002\n" +
                 "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -1074,7 +1074,7 @@ public class TopologyTest {
         stream1 = builder.stream("input-topic1");
         stream2 = builder.stream("input-topic2");
 
-        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100));
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ZERO);
 
         final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
             Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
@@ -1107,10 +1107,10 @@ public class TopologyTest {
                 "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
                 "      --> KSTREAM-OUTEROTHER-0000000005\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
-                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store-outer-shared-join-store, in-memory-join-store])\n" +
+                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000003\n" +
-                "    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-outer-shared-join-store])\n" +
+                "    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [in-memory-join-store-other])\n" +
                 "      --> KSTREAM-MERGE-0000000006\n" +
                 "      <-- KSTREAM-WINDOWED-0000000002\n" +
                 "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 9d2bd1e..ef00743 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -100,7 +100,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
         leftStream.join(
             rightStream,
             valueJoiner,
-            JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+            JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
         ).to(OUTPUT_TOPIC);
 
         runTestWithDriver(expectedResult);
@@ -147,7 +147,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
                     .selectKey(MockMapper.selectKeyKeyValueMapper()),
                 valueJoiner,
-                JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+                JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
             ).to(OUTPUT_TOPIC);
 
         runTestWithDriver(expectedResult);
@@ -161,7 +161,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)
+            ),
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
@@ -192,7 +195,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
         leftStream.leftJoin(
             rightStream,
             valueJoiner,
-            JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+            JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
         ).to(OUTPUT_TOPIC);
 
         runTestWithDriver(expectedResult);
@@ -206,7 +209,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)
+            ),
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
@@ -239,7 +245,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
                      .selectKey(MockMapper.selectKeyKeyValueMapper()),
                 valueJoiner,
-                JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+                JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
             ).to(OUTPUT_TOPIC);
 
         runTestWithDriver(expectedResult);
@@ -253,7 +259,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)
+            ),
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
@@ -284,7 +293,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
         leftStream.outerJoin(
             rightStream,
             valueJoiner,
-            JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+            JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
         ).to(OUTPUT_TOPIC);
 
         runTestWithDriver(expectedResult);
@@ -298,7 +307,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)
+            ),
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
@@ -331,7 +343,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
                     .selectKey(MockMapper.selectKeyKeyValueMapper()),
                 valueJoiner,
-                JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+                JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
             ).to(OUTPUT_TOPIC);
 
         runTestWithDriver(expectedResult);
@@ -424,11 +436,11 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
         leftStream.join(
             rightStream,
             valueJoiner,
-            JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+            JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
         ).join(
             rightStream,
             valueJoiner,
-            JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+            JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
         ).to(OUTPUT_TOPIC);
 
         runTestWithDriver(expectedResult);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 3d38ada..7fb692e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -75,8 +75,6 @@ public class JoinWindowsTest {
     @Test
     public void timeDifferenceMustNotBeNegative() {
         assertThrows(IllegalArgumentException.class, () -> JoinWindows.of(ofMillis(-1)));
-        assertThrows(IllegalArgumentException.class, () -> JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(-1)));
-        assertThrows(IllegalArgumentException.class, () -> JoinWindows.ofTimeDifferenceAndGrace(ofMillis(-1), ofMillis(ANY_GRACE)));
     }
 
     @Test
@@ -149,16 +147,6 @@ public class JoinWindowsTest {
             JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
             JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
         );
-
-        verifyEquality(
-                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)),
-                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
-        );
-
-        verifyEquality(
-                JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)),
-                JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4))
-        );
     }
 
     @Test
@@ -188,15 +176,5 @@ public class JoinWindowsTest {
             JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)),
             JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
         );
-
-        verifyInEquality(
-                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(9)),
-                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
-        );
-
-        verifyInEquality(
-                JoinWindows.ofTimeDifferenceAndGrace(ofMillis(9), ofMillis(9)),
-                JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(9))
-        );
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
index b35a7b2..dcd519d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Arrays;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -64,7 +65,7 @@ public class KStreamImplValueJoinerWithKeyTest {
 
     private final ValueJoinerWithKey<String, Integer, Integer, String> valueJoinerWithKey =
         (key, lv, rv) -> key + ":" + (lv + (rv == null ? 0 : rv));
-    private final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), ofHours(24L));
+    private final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(ofHours(24L));
     private final StreamJoined<String, Integer, Integer> streamJoined =
             StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
     private final Joined<String, Integer, Integer> joined =
@@ -108,7 +109,8 @@ public class KStreamImplValueJoinerWithKeyTest {
         ).to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
         // Left KV A, 3, Right KV A, 5
         // TTD pipes records to left stream first, then right
-        final List<KeyValue<String, String>> expectedResults = Collections.singletonList(KeyValue.pair("A", "A:5"));
+        final List<KeyValue<String, String>> expectedResults =
+            Arrays.asList(KeyValue.pair("A", "A:3"), KeyValue.pair("A", "A:5"));
         runJoinTopology(
             builder,
             expectedResults,
@@ -128,7 +130,8 @@ public class KStreamImplValueJoinerWithKeyTest {
 
         // Left KV A, 3, Right KV A, 5
         // TTD pipes records to left stream first, then right
-        final List<KeyValue<String, String>> expectedResults = Collections.singletonList(KeyValue.pair("A", "A:5"));
+        final List<KeyValue<String, String>> expectedResults =
+            Arrays.asList(KeyValue.pair("A", "A:3"), KeyValue.pair("A", "A:5"));
         runJoinTopology(
             builder,
             expectedResults,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 1d50a37..517e0eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -515,7 +515,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofHours(24L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -532,7 +532,7 @@ public class KStreamKStreamJoinTest {
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // push two items to the primary stream; the other window is empty; this should not produce items yet
+            // push two items to the primary stream; the other window is empty; this should produce 2 spurious items
             // w1 = {}
             // w2 = {}
             // --> w1 = { 0:A0, 1:A1 }
@@ -540,7 +540,10 @@ public class KStreamKStreamJoinTest {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "A1+null", 0L)
+            );
 
             // push two items to the other stream; this should produce two items
             // w1 = { 0:A0, 1:A1 }
@@ -555,7 +558,7 @@ public class KStreamKStreamJoinTest {
                 new KeyValueTimestamp<>(1, "A1+a1", 0L)
             );
 
-            // push all four items to the primary stream; this should produce two items
+            // push all four items to the primary stream; this should produce four items
             // w1 = { 0:A0, 1:A1 }
             // w2 = { 0:a0, 1:a1 }
             // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
@@ -565,7 +568,9 @@ public class KStreamKStreamJoinTest {
             }
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "B0+a0", 0L),
-                new KeyValueTimestamp<>(1, "B1+a1", 0L)
+                new KeyValueTimestamp<>(1, "B1+a1", 0L),
+                new KeyValueTimestamp<>(2, "B2+null", 0L),
+                new KeyValueTimestamp<>(3, "B3+null", 0L)
             );
 
             // push all items to the other stream; this should produce six items
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 4e2b6d8..5c6b3f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -172,7 +172,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -202,6 +202,8 @@ public class KStreamKStreamLeftJoinTest {
             inputTopic2.pipeInput(2, "a2", 1001L);
 
             processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "A2+null", 1000L),
+                new KeyValueTimestamp<>(2, "A2-0+null", 1000L),
                 new KeyValueTimestamp<>(2, "A2+a2", 1001L),
                 new KeyValueTimestamp<>(2, "A2-0+a2", 1001L)
             );
@@ -228,7 +230,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -246,25 +248,16 @@ public class KStreamKStreamLeftJoinTest {
             inputTopic1.pipeInput(0, "A0", windowStart + 1L);
             inputTopic1.pipeInput(1, "A1", windowStart + 2L);
             inputTopic1.pipeInput(0, "A0-0", windowStart + 3L);
-            processor.checkAndClearProcessResult();
-
-            // Join detected; No null-joins emitted
-            inputTopic2.pipeInput(1, "a1", windowStart + 3L);
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
-            );
-
-            // Dummy record in left topic will emit expired non-joined records from the left topic
-            inputTopic1.pipeInput(2, "dummy", windowStart + 401L);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L),
+                new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L),
                 new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L)
             );
 
-            // Flush internal non-joined state store by joining the dummy record
-            inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401L)
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
             );
         }
     }
@@ -283,7 +276,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -297,29 +290,20 @@ public class KStreamKStreamLeftJoinTest {
 
             final long windowStart = 0L;
 
-            // No joins detected; No null-joins emitted
+            // No joins detected; spurious null-joins emitted
             inputTopic1.pipeInput(0, "A0", windowStart + 1L);
             inputTopic1.pipeInput(1, "A1", windowStart + 2L);
             inputTopic1.pipeInput(0, "A0-0", windowStart + 3L);
-            processor.checkAndClearProcessResult();
-
-            // Join detected; No null-joins emitted
-            inputTopic2.pipeInput(1, "a1", windowStart + 3L);
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
-            );
-
-            // Dummy record in right topic will emit expired non-joined records from the left topic
-            inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L),
+                new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L),
                 new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L)
             );
 
-            // Flush internal non-joined state store by joining the dummy record
-            inputTopic1.pipeInput(2, "dummy", windowStart + 402L);
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
             );
         }
     }
@@ -338,7 +322,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -363,16 +347,6 @@ public class KStreamKStreamLeftJoinTest {
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
             );
-
-            // Dummy record in left topic will not emit records
-            inputTopic1.pipeInput(2, "dummy", windowStart + 401L);
-            processor.checkAndClearProcessResult();
-
-            // Process the dummy joined record
-            inputTopic2.pipeInput(2, "dummy", windowStart + 402L);
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
-            );
         }
     }
 
@@ -390,7 +364,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+            JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -430,7 +404,7 @@ public class KStreamKStreamLeftJoinTest {
 
     @Test
     public void testLeftJoinWithInMemoryCustomSuppliers() {
-        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L));
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L));
 
         final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
             Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
@@ -447,7 +421,7 @@ public class KStreamKStreamLeftJoinTest {
 
     @Test
     public void testLeftJoinWithDefaultSuppliers() {
-        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO);
         final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
 
         runLeftJoin(streamJoined, joinWindows);
@@ -487,8 +461,8 @@ public class KStreamKStreamLeftJoinTest {
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // 2 window stores + 1 shared window store should be available
-            assertEquals(3, driver.getAllStateStores().size());
+            // 2 window stores should be available
+            assertEquals(2, driver.getAllStateStores().size());
 
             // push two items to the primary stream; the other window is empty
             // w1 {}
@@ -498,7 +472,10 @@ public class KStreamKStreamLeftJoinTest {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "A1+null", 0L)
+            );
 
             // push two items to the other stream; this should produce two items
             // w1 = { 0:A0, 1:A1 }
@@ -513,7 +490,7 @@ public class KStreamKStreamLeftJoinTest {
                 new KeyValueTimestamp<>(1, "A1+a1", 0L)
             );
 
-            // push three items to the primary stream; this should produce two joined items
+            // push three items to the primary stream; this should produce three joined items
             // w1 = { 0:A0, 1:A1 }
             // w2 = { 0:a0, 1:a1 }
             // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
@@ -523,7 +500,9 @@ public class KStreamKStreamLeftJoinTest {
             }
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "B0+a0", 0L),
-                new KeyValueTimestamp<>(1, "B1+a1", 0L)
+                new KeyValueTimestamp<>(1, "B1+a1", 0L),
+                new KeyValueTimestamp<>(2, "B2+null", 0L)
+
             );
 
             // push all items to the other stream; this should produce five items
@@ -558,11 +537,6 @@ public class KStreamKStreamLeftJoinTest {
                 new KeyValueTimestamp<>(2, "C2+b2", 0L),
                 new KeyValueTimestamp<>(3, "C3+b3", 0L)
             );
-
-            // push a dummy record that should expire non-joined items; it should not produce any items because
-            // all of them are joined
-            inputTopic1.pipeInput(0, "dummy", 1000L);
-            processor.checkAndClearProcessResult();
         }
     }
 
@@ -580,7 +554,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+            JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -592,17 +566,19 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // push two items to the primary stream; the other window is empty; this should not produce any item yet
+            // push two items to the primary stream; the other window is empty; this should produce two spurious items
             // w1 = {}
             // w2 = {}
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
             // --> w2 = {}
             inputTopic1.pipeInput(0, "A0", 0L);
             inputTopic1.pipeInput(1, "A1", 100L);
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "A1+null", 100L)
+            );
 
-            // push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
-            // the joined records
+            // push one item to the other window that has a join; this should produce the joined records
             // by the time they were produced before
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
             // w2 = { }
@@ -610,7 +586,6 @@ public class KStreamKStreamLeftJoinTest {
             // --> w2 = { 1:a1 (ts: 110) }
             inputTopic2.pipeInput(1, "a1", 110L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(0, "A0+null", 0L),
                 new KeyValueTimestamp<>(1, "A1+a1", 110L)
             );
         }
@@ -631,7 +606,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -658,7 +633,10 @@ public class KStreamKStreamLeftJoinTest {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time);
             }
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "A1+null", 0L)
+            );
 
             // push two items to the other stream with a window time after the previous window ended (not closed); this should not produce
             // joined records because the window has ended, but not closed.
@@ -671,20 +649,6 @@ public class KStreamKStreamLeftJoinTest {
                 inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time);
             }
             processor.checkAndClearProcessResult();
-
-            // push a dummy item to the other stream after the window is closed; this should only produced the expired non-joined records, but
-            // not the joined records because the window has closed
-            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
-            // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
-            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
-            // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101),
-            //            0:dummy (ts: 211)}
-            time += 1100L;
-            inputTopic2.pipeInput(0, "dummy", time);
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(0, "A0+null", 0L),
-                new KeyValueTimestamp<>(1, "A1+null", 0L)
-            );
         }
     }
 
@@ -703,7 +667,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+            JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -722,7 +686,7 @@ public class KStreamKStreamLeftJoinTest {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
             final long time = 0L;
 
-            // push two items to the primary stream; the other window is empty; this should not produce any items
+            // push two items to the primary stream; the other window is empty; this should produce two spurious items
             // w1 = {}
             // w2 = {}
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -730,7 +694,10 @@ public class KStreamKStreamLeftJoinTest {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time);
             }
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "A1+null", 0L)
+            );
 
             // push four items to the other stream; this should produce two full-join items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -790,7 +757,7 @@ public class KStreamKStreamLeftJoinTest {
             new KeyValueTimestamp<>(3, "B3+b3", 1100L)
         );
 
-        // push four items with increased timestamp to the primary stream; this should produce one left-join and three full-join items (non-joined item is not produced yet)
+        // push four items with increased timestamp to the primary stream; this should produce one left-join and three full-join items plus one spurious item
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) }
         // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
@@ -805,12 +772,13 @@ public class KStreamKStreamLeftJoinTest {
             inputTopic1.pipeInput(expectedKey, "C" + expectedKey, time);
         }
         processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "C0+null", 1101L),
             new KeyValueTimestamp<>(1, "C1+b1", 1101L),
             new KeyValueTimestamp<>(2, "C2+b2", 1101L),
             new KeyValueTimestamp<>(3, "C3+b3", 1101L)
         );
 
-        // push four items with increased timestamp to the primary stream; this should produce two left-join and two full-join items (non-joined items are not produced yet)
+        // push four items with increased timestamp to the primary stream; this should produce two spurious and two inner items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
         //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101) }
@@ -827,11 +795,13 @@ public class KStreamKStreamLeftJoinTest {
             inputTopic1.pipeInput(expectedKey, "D" + expectedKey, time);
         }
         processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "D0+null", 1102L),
+            new KeyValueTimestamp<>(1, "D1+null", 1102L),
             new KeyValueTimestamp<>(2, "D2+b2", 1102L),
             new KeyValueTimestamp<>(3, "D3+b3", 1102L)
         );
 
-        // push four items with increased timestamp to the primary stream; this should produce one full-join items (three non-joined left-join are not produced yet)
+        // push four items with increased timestamp to the primary stream; this should produce one full-join items plus three spurious left-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
         //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
@@ -850,10 +820,13 @@ public class KStreamKStreamLeftJoinTest {
             inputTopic1.pipeInput(expectedKey, "E" + expectedKey, time);
         }
         processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "E0+null", 1103L),
+            new KeyValueTimestamp<>(1, "E1+null", 1103L),
+            new KeyValueTimestamp<>(2, "E2+null", 1103L),
             new KeyValueTimestamp<>(3, "E3+b3", 1103L)
         );
 
-        // push four items with increased timestamp to the primary stream; this should produce no full-join items (four non-joined left-join are not produced yet)
+        // push four items with increased timestamp to the primary stream; this should produce four spurious left-join itmes
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
         //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
@@ -873,19 +846,7 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             inputTopic1.pipeInput(expectedKey, "F" + expectedKey, time);
         }
-        processor.checkAndClearProcessResult();
-
-        // push a dummy record to produce all left-join non-joined items
-        time += 301L;
-        driver.advanceWallClockTime(Duration.ofMillis(1000L));
-        inputTopic1.pipeInput(0, "dummy", time);
         processor.checkAndClearProcessResult(
-            new KeyValueTimestamp<>(0, "C0+null", 1101L),
-            new KeyValueTimestamp<>(0, "D0+null", 1102L),
-            new KeyValueTimestamp<>(1, "D1+null", 1102L),
-            new KeyValueTimestamp<>(0, "E0+null", 1103L),
-            new KeyValueTimestamp<>(1, "E1+null", 1103L),
-            new KeyValueTimestamp<>(2, "E2+null", 1103L),
             new KeyValueTimestamp<>(0, "F0+null", 1104L),
             new KeyValueTimestamp<>(1, "F1+null", 1104L),
             new KeyValueTimestamp<>(2, "F2+null", 1104L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 2d9e320..42b81f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -113,7 +113,7 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -125,20 +125,19 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // verifies non-joined duplicates are emitted when window has closed
+            // verifies non-joined duplicates are emitted
             inputTopic1.pipeInput(0, "A0", 0L);
             inputTopic1.pipeInput(0, "A0-0", 0L);
             inputTopic2.pipeInput(1, "a1", 0L);
             inputTopic2.pipeInput(1, "a1-0", 0L);
             inputTopic2.pipeInput(1, "a0", 111L);
-            // bump stream-time to trigger outer-join results
-            inputTopic2.pipeInput(3, "dummy", 211);
 
             processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(0, "A0-0+null", 0L),
                 new KeyValueTimestamp<>(1, "null+a1", 0L),
                 new KeyValueTimestamp<>(1, "null+a1-0", 0L),
-                new KeyValueTimestamp<>(0, "A0+null", 0L),
-                new KeyValueTimestamp<>(0, "A0-0+null", 0L)
+                new KeyValueTimestamp<>(1, "null+a0", 111L)
             );
 
             // verifies joined duplicates are emitted
@@ -148,21 +147,13 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic2.pipeInput(2, "a2-0", 201L);
 
             processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "A2+null", 200L),
+                new KeyValueTimestamp<>(2, "A2-0+null", 200L),
                 new KeyValueTimestamp<>(2, "A2+a2", 201L),
                 new KeyValueTimestamp<>(2, "A2-0+a2", 201L),
                 new KeyValueTimestamp<>(2, "A2+a2-0", 201L),
                 new KeyValueTimestamp<>(2, "A2-0+a2-0", 201L)
             );
-
-            // this record should expired non-joined records; only null+a0 will be emitted because
-            // it did not have a join
-            driver.advanceWallClockTime(Duration.ofMillis(1000L));
-            inputTopic2.pipeInput(3, "dummy", 1500L);
-
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(1, "null+a0", 111L),
-                new KeyValueTimestamp<>(3, "null+dummy", 211)
-            );
         }
     }
 
@@ -180,7 +171,7 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+            JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -194,29 +185,20 @@ public class KStreamKStreamOuterJoinTest {
 
             final long windowStart = 0L;
 
-            // No joins detected; No null-joins emitted
+            // No joins detected; three spurious item emitted
             inputTopic1.pipeInput(0, "A0", windowStart + 1L);
             inputTopic1.pipeInput(1, "A1", windowStart + 2L);
             inputTopic1.pipeInput(0, "A0-0", windowStart + 3L);
-            processor.checkAndClearProcessResult();
-
-            // Join detected; No null-joins emitted
-            inputTopic2.pipeInput(1, "a1", windowStart + 3L);
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
-            );
-
-            // Dummy record in left topic will emit expired non-joined records from the left topic
-            inputTopic1.pipeInput(2, "dummy", windowStart + 401L);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L),
+                new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L),
                 new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L)
             );
 
-            // Flush internal non-joined state store by joining the dummy record
-            inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401L)
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
             );
         }
     }
@@ -235,7 +217,7 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -253,25 +235,16 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic1.pipeInput(0, "A0", windowStart + 1L);
             inputTopic1.pipeInput(1, "A1", windowStart + 2L);
             inputTopic1.pipeInput(0, "A0-0", windowStart + 3L);
-            processor.checkAndClearProcessResult();
-
-            // Join detected; No null-joins emitted
-            inputTopic2.pipeInput(1, "a1", windowStart + 3L);
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
-            );
-
-            // Dummy record in right topic will emit expired non-joined records from the left topic
-            inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L),
+                new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L),
                 new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L)
             );
 
-            // Flush internal non-joined state store by joining the dummy record
-            inputTopic1.pipeInput(2, "dummy", windowStart + 402L);
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
             );
         }
     }
@@ -290,7 +263,7 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+            JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -304,29 +277,20 @@ public class KStreamKStreamOuterJoinTest {
 
             final long windowStart = 0L;
 
-            // No joins detected; No null-joins emitted
+            // No joins detectedl; only spurious resulst
             inputTopic2.pipeInput(0, "A0", windowStart + 1L);
             inputTopic2.pipeInput(1, "A1", windowStart + 2L);
             inputTopic2.pipeInput(0, "A0-0", windowStart + 3L);
-            processor.checkAndClearProcessResult();
-
-            // Join detected; No null-joins emitted
-            inputTopic1.pipeInput(1, "a1", windowStart + 3L);
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
-            );
-
-            // Dummy record in left topic will emit expired non-joined records from the right topic
-            inputTopic1.pipeInput(2, "dummy", windowStart + 401L);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "null+A0", windowStart + 1L),
+                new KeyValueTimestamp<>(1, "null+A1", windowStart + 2L),
                 new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3L)
             );
 
-            // Process the dummy joined record
-            inputTopic2.pipeInput(2, "dummy", windowStart + 402L);
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
             );
         }
     }
@@ -345,7 +309,7 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -363,25 +327,16 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic2.pipeInput(0, "A0", windowStart + 1L);
             inputTopic2.pipeInput(1, "A1", windowStart + 2L);
             inputTopic2.pipeInput(0, "A0-0", windowStart + 3L);
-            processor.checkAndClearProcessResult();
-
-            // Join detected; No null-joins emitted
-            inputTopic1.pipeInput(1, "a1", windowStart + 3L);
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
-            );
-
-            // Dummy record in right topic will emit expired non-joined records from the right topic
-            inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "null+A0", windowStart + 1L),
+                new KeyValueTimestamp<>(1, "null+A1", windowStart + 2L),
                 new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3L)
             );
 
-            // Process the dummy joined record
-            inputTopic1.pipeInput(2, "dummy", windowStart + 402L);
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
             );
         }
     }
@@ -400,7 +355,7 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+            JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -412,17 +367,19 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // push two items to the primary stream; the other window is empty; this should not produce any item yet
+            // push two items to the primary stream; the other window is empty; this should produce two spurious resulst
             // w1 = {}
             // w2 = {}
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
             // --> w2 = {}
             inputTopic1.pipeInput(0, "A0", 0L);
             inputTopic1.pipeInput(1, "A1", 100L);
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "A1+null", 100L)
+            );
 
-            // push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
-            // the joined records
+            // push one item to the other window that has a join; this should produce the joined records
             // by the time they were produced before
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
             // w2 = { }
@@ -430,7 +387,6 @@ public class KStreamKStreamOuterJoinTest {
             // --> w2 = { 0:a0 (ts: 110) }
             inputTopic2.pipeInput(1, "a1", 110L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(0, "A0+null", 0L),
                 new KeyValueTimestamp<>(1, "A1+a1", 110L)
             );
         }
@@ -450,7 +406,7 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), ofMillis(10)),
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(10)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -468,43 +424,36 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // push one item to the primary stream; and one item in other stream; this should not produce items because there are no joins
-            // and window has not ended
+            // push one item to the primary stream; and one item in other stream; this should produce two spurious items
             // w1 = {}
             // w2 = {}
             // --> w1 = { 0:A0 (ts: 0) }
             // --> w2 = { 1:a1 (ts: 0) }
             inputTopic1.pipeInput(0, "A0", 0L);
             inputTopic2.pipeInput(1, "a1", 0L);
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "null+a1", 0L)
+            );
 
-            // push one item on each stream with a window time after the previous window ended (not closed); this should not produce
-            // joined records because the window has ended, but will not produce non-joined records because the window has not closed.
+            // push one item on each stream with a window time after the previous window ended;
+            // this should produce two spurious items
             // w1 = { 0:A0 (ts: 0) }
             // w2 = { 1:a1 (ts: 0) }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
             inputTopic2.pipeInput(0, "a0", 101L);
             inputTopic1.pipeInput(1, "A1", 101L);
-            processor.checkAndClearProcessResult();
-
-            // push a dummy item to the any stream after the window is closed; this should produced all expired non-joined records because
-            // the window has closed
-            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
-            // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
-            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
-            // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-            inputTopic2.pipeInput(0, "dummy", 211);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(1, "null+a1", 0L),
-                new KeyValueTimestamp<>(0, "A0+null", 0L)
+                new KeyValueTimestamp<>(0, "null+a0", 101L),
+                new KeyValueTimestamp<>(1, "A1+null", 101L)
             );
         }
     }
 
     @Test
     public void testOuterJoinWithInMemoryCustomSuppliers() {
-        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO);
 
         final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore(
             "in-memory-join-store",
@@ -527,7 +476,7 @@ public class KStreamKStreamOuterJoinTest {
 
     @Test
     public void testOuterJoinWithDefaultSuppliers() {
-        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO);
         final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
 
         runOuterJoin(streamJoined, joinWindows);
@@ -567,11 +516,11 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // 2 window stores + 1 shared window store should be available
-            assertEquals(3, driver.getAllStateStores().size());
+            // 2 window stores should be available
+            assertEquals(2, driver.getAllStateStores().size());
 
-            // push two items to the primary stream; the other window is empty; this should not
-            // produce any items because window has not expired
+            // push two items to the primary stream; the other window is empty; this should
+            // produce two spurious items
             // w1 {}
             // w2 {}
             // --> w1 = { 0:A0, 1:A1 }
@@ -579,7 +528,10 @@ public class KStreamKStreamOuterJoinTest {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "A1+null", 0L)
+            );
 
             // push two items to the other stream; this should produce two full-joined items
             // w1 = { 0:A0, 1:A1 }
@@ -594,7 +546,7 @@ public class KStreamKStreamOuterJoinTest {
                 new KeyValueTimestamp<>(1, "A1+a1", 0L)
             );
 
-            // push three items to the primary stream; this should produce two full-joined items
+            // push three items to the primary stream; this should produce threeitems
             // w1 = { 0:A0, 1:A1 }
             // w2 = { 0:a0, 1:a1 }
             // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
@@ -604,10 +556,11 @@ public class KStreamKStreamOuterJoinTest {
             }
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "B0+a0", 0L),
-                new KeyValueTimestamp<>(1, "B1+a1", 0L)
+                new KeyValueTimestamp<>(1, "B1+a1", 0L),
+                new KeyValueTimestamp<>(2, "B2+null", 0L)
             );
 
-            // push all items to the other stream; this should produce five full-joined items
+            // push all items to the other stream; this should produce six items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
             // w2 = { 0:a0, 1:a1 }
             // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
@@ -620,7 +573,8 @@ public class KStreamKStreamOuterJoinTest {
                 new KeyValueTimestamp<>(0, "B0+b0", 0L),
                 new KeyValueTimestamp<>(1, "A1+b1", 0L),
                 new KeyValueTimestamp<>(1, "B1+b1", 0L),
-                new KeyValueTimestamp<>(2, "B2+b2", 0L)
+                new KeyValueTimestamp<>(2, "B2+b2", 0L),
+                new KeyValueTimestamp<>(3, "null+b3", 0L)
             );
 
             // push all four items to the primary stream; this should produce six full-joined items
@@ -639,11 +593,6 @@ public class KStreamKStreamOuterJoinTest {
                 new KeyValueTimestamp<>(2, "C2+b2", 0L),
                 new KeyValueTimestamp<>(3, "C3+b3", 0L)
             );
-
-            // push a dummy record that should expire non-joined items; it should not produce any items because
-            // all of them are joined
-            inputTopic1.pipeInput(0, "dummy", 400L);
-            processor.checkAndClearProcessResult();
         }
     }
 
@@ -662,7 +611,7 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+            JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -681,7 +630,7 @@ public class KStreamKStreamOuterJoinTest {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
             final long time = 0L;
 
-            // push two items to the primary stream; the other window is empty; this should not produce items because window has not closed
+            // push two items to the primary stream; the other window is empty; this should produce two spurious items
             // w1 = {}
             // w2 = {}
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -689,9 +638,12 @@ public class KStreamKStreamOuterJoinTest {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time);
             }
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(1, "A1+null", 0L)
+            );
 
-            // push four items to the other stream; this should produce two full-join items
+            // push four items to the other stream; this should produce four full-join items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // w2 = {}
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -701,7 +653,9 @@ public class KStreamKStreamOuterJoinTest {
             }
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "A0+a0", 0L),
-                new KeyValueTimestamp<>(1, "A1+a1", 0L)
+                new KeyValueTimestamp<>(1, "A1+a1", 0L),
+                new KeyValueTimestamp<>(2, "null+a2", 0L),
+                new KeyValueTimestamp<>(3, "null+a3", 0L)
             );
 
             testUpperWindowBound(expectedKeys, driver, processor);
@@ -730,8 +684,10 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic2.pipeInput(expectedKeys[i], "b" + expectedKeys[i], time + i);
         }
         processor.checkAndClearProcessResult(
-            new KeyValueTimestamp<>(2, "null+a2", 0L),
-            new KeyValueTimestamp<>(3, "null+a3", 0L)
+            new KeyValueTimestamp<>(0, "null+b0", 1000L),
+            new KeyValueTimestamp<>(1, "null+b1", 1001L),
+            new KeyValueTimestamp<>(2, "null+b2", 1002L),
+            new KeyValueTimestamp<>(3, "null+b3", 1003L)
         );
 
         // push four items with larger timestamp to the primary stream; this should produce four full-join items
@@ -753,7 +709,7 @@ public class KStreamKStreamOuterJoinTest {
             new KeyValueTimestamp<>(3, "B3+b3", 1100L)
         );
 
-        // push four items with increased timestamp to the primary stream; this should produce three full-join items (non-joined item is not produced yet)
+        // push four items with increased timestamp to the primary stream; this should produce three full-join items plus one spurious item
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) }
         // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
@@ -768,12 +724,13 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic1.pipeInput(expectedKey, "C" + expectedKey, time);
         }
         processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "C0+null", 1101L),
             new KeyValueTimestamp<>(1, "C1+b1", 1101L),
             new KeyValueTimestamp<>(2, "C2+b2", 1101L),
             new KeyValueTimestamp<>(3, "C3+b3", 1101L)
         );
 
-        // push four items with increased timestamp to the primary stream; this should produce two full-join items (non-joined items are not produced yet)
+        // push four items with increased timestamp to the primary stream; this should produce two full-join items plus two spurious items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
         //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101) }
@@ -790,11 +747,13 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic1.pipeInput(expectedKey, "D" + expectedKey, time);
         }
         processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "D0+null", 1102L),
+            new KeyValueTimestamp<>(1, "D1+null", 1102L),
             new KeyValueTimestamp<>(2, "D2+b2", 1102L),
             new KeyValueTimestamp<>(3, "D3+b3", 1102L)
         );
 
-        // push four items with increased timestamp to the primary stream; this should produce one full-join items (three non-joined left-join are not produced yet)
+        // push four items with increased timestamp to the primary stream; this should produce one full-join items plus three spurious itmes
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
         //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
@@ -813,10 +772,13 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic1.pipeInput(expectedKey, "E" + expectedKey, time);
         }
         processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "E0+null", 1103L),
+            new KeyValueTimestamp<>(1, "E1+null", 1103L),
+            new KeyValueTimestamp<>(2, "E2+null", 1103L),
             new KeyValueTimestamp<>(3, "E3+b3", 1103L)
         );
 
-        // push four items with increased timestamp to the primary stream; this should produce no full-join items (four non-joined left-join are not produced yet)
+        // push four items with increased timestamp to the primary stream; this should produce four spurious items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
         //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
@@ -836,23 +798,12 @@ public class KStreamKStreamOuterJoinTest {
         for (final int expectedKey : expectedKeys) {
             inputTopic1.pipeInput(expectedKey, "F" + expectedKey, time);
         }
-        processor.checkAndClearProcessResult();
-
-        // push a dummy record to produce all left-join non-joined items
-        time += 301L;
-        driver.advanceWallClockTime(Duration.ofMillis(1000L));
-        inputTopic1.pipeInput(0, "dummy", time);
         processor.checkAndClearProcessResult(
-            new KeyValueTimestamp<>(0, "C0+null", 1101L),
-            new KeyValueTimestamp<>(0, "D0+null", 1102L),
-            new KeyValueTimestamp<>(1, "D1+null", 1102L),
-            new KeyValueTimestamp<>(0, "E0+null", 1103L),
-            new KeyValueTimestamp<>(1, "E1+null", 1103L),
-            new KeyValueTimestamp<>(2, "E2+null", 1103L),
             new KeyValueTimestamp<>(0, "F0+null", 1104L),
             new KeyValueTimestamp<>(1, "F1+null", 1104L),
             new KeyValueTimestamp<>(2, "F2+null", 1104L),
             new KeyValueTimestamp<>(3, "F3+null", 1104L)
+
         );
     }
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 92ca5bd..ad02d04 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -379,7 +379,7 @@ class TopologyTest {
       mappedStream
         .filter((k: String, _: String) => k == "A")
         .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString,
-                       JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))(
+                       JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))(
           StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde)
         )
         .to(JOINED_TOPIC)
@@ -387,7 +387,7 @@ class TopologyTest {
       mappedStream
         .filter((k: String, _: String) => k == "A")
         .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString,
-                       JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))(
+                       JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))(
           StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde)
         )
         .to(JOINED_TOPIC)
@@ -439,7 +439,7 @@ class TopologyTest {
         .join[Integer, String](
           stream2,
           valueJoiner2,
-          JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)),
+          JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)),
           StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer)
         )
         .to(JOINED_TOPIC)
@@ -449,7 +449,7 @@ class TopologyTest {
         .join(
           stream3,
           valueJoiner3,
-          JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)),
+          JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)),
           StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String)
         )
         .to(JOINED_TOPIC)
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 0ec7b0e..1ba1238 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -201,7 +201,7 @@ class KStreamTest extends TestDriver {
     val stream1 = builder.stream[String, String](sourceTopic1)
     val stream2 = builder.stream[String, String](sourceTopic2)
     stream1
-      .join(stream2)((a, b) => s"$a-$b", JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(1), Duration.ofHours(24)))
+      .join(stream2)((a, b) => s"$a-$b", JoinWindows.of(ofSeconds(1)).grace(Duration.ofHours(24)))
       .to(sinkTopic)
 
     val now = Instant.now()

Mime
View raw message