kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6455: KStream-KStream join should set max timestamp for result record (#6565)
Date Wed, 17 Apr 2019 16:07:45 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e08358  KAFKA-6455: KStream-KStream join should set max timestamp for result record (#6565)
0e08358 is described below

commit 0e08358da4b114834140c720b98125a2a3a84caa
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Wed Apr 17 09:07:25 2019 -0700

    KAFKA-6455: KStream-KStream join should set max timestamp for result record (#6565)
    
    Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../kstream/internals/KStreamKStreamJoin.java      |  13 +-
 .../kstream/internals/KStreamKStreamJoinTest.java  | 978 ++++++++++++++++-----
 .../internals/KStreamKStreamLeftJoinTest.java      | 415 ++++++---
 3 files changed, 1084 insertions(+), 322 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 4c6998a..17d7837 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -85,13 +87,18 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
             boolean needOuterJoin = outer;
 
-            final long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
-            final long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
+            final long inputRecordTimestamp = context().timestamp();
+            final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
+            final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
-                    context().forward(key, joiner.apply(value, iter.next().value));
+                    final KeyValue<Long, V2> otherRecord = iter.next();
+                    context().forward(
+                        key,
+                        joiner.apply(value, otherRecord.value),
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
                 }
 
                 if (needOuterJoin) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index baac74e..fbd2750 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -110,65 +110,65 @@ public class KStreamKStreamJoinTest {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // push two items to the primary stream. the other window is empty
+            // push two items to the primary stream; the other window is empty
             // w1 = {}
             // w2 = {}
-            // --> w1 = { 0:X0, 1:X1 }
+            // --> w1 = { 0:A0, 1:A1 }
             //     w2 = {}
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i]));
             }
             processor.checkAndClearProcessResult();
 
-            // push two items to the other stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
+            // push two items to the other stream; this should produce two items
+            // w1 = { 0:A0, 1:A1 }
             // w2 = {}
-            // --> w1 = { 0:X0, 1:X1 }
-            //     w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:A0, 1:A1 }
+            //     w2 = { 0:a0, 1:a1 }
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
 
-            // push all four items to the primary stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1 }
+            // push all four items to the primary stream; this should produce two items
+            // w1 = { 0:A0, 1:A1 }
+            // w2 = { 0:a0, 1:a1 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
+            //     w2 = { 0:a0, 1:a1 }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "B" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)");
+            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)");
 
-            // push all items to the other stream. this should produce six items.
-            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all items to the other stream; this should produce six items
+            // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
+            // w2 = { 0:a0, 1:a1 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
+            //     w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 0)", "1:X1+YY1 (ts: 0)", "2:X2+YY2 (ts: 0)", "3:X3+YY3 (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)", "3:B3+b3 (ts: 0)");
 
-            // push all four items to the primary stream. this should produce six items.
-            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all four items to the primary stream; this should produce six items
+            // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
+            // w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3, 0:C0, 1:C1, 2:C2, 3:C3 }
+            //     w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 0)", "0:XX0+YY0 (ts: 0)", "1:XX1+Y1 (ts: 0)", "1:XX1+YY1 (ts: 0)", "2:XX2+YY2 (ts: 0)", "3:XX3+YY3 (ts: 0)");
+            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
 
-            // push two items to the other stream. this should produce six item.
-            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+            // push two items to the other stream; this should produce six items
+            // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3, 0:C0, 1:C1, 2:C2, 3:C3 }
+            // w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3, 0:C0, 1:C1, 2:C2, 3:C3 }
+            //     w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3, 0:c0, 1:c1 }
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "c" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:X0+YYY0 (ts: 0)", "0:X0+YYY0 (ts: 0)", "0:XX0+YYY0 (ts: 0)", "1:X1+YYY1 (ts: 0)", "1:X1+YYY1 (ts: 0)", "1:XX1+YYY1 (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+c0 (ts: 0)", "0:B0+c0 (ts: 0)", "0:C0+c0 (ts: 0)", "1:A1+c1 (ts: 0)", "1:B1+c1 (ts: 0)", "1:C1+c1 (ts: 0)");
         }
     }
 
@@ -200,65 +200,65 @@ public class KStreamKStreamJoinTest {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // push two items to the primary stream. the other window is empty.this should produce two items
+            // push two items to the primary stream; the other window is empty; this should produce two items
             // w1 = {}
             // w2 = {}
-            // --> w1 = { 0:X0, 1:X1 }
+            // --> w1 = { 0:A0, 1:A1 }
             //     w2 = {}
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
 
-            // push two items to the other stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
+            // push two items to the other stream; this should produce two items
+            // w1 = { 0:A0, 1:A1 }
             // w2 = {}
-            // --> w1 = { 0:X0, 1:X1 }
-            //     w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:A0, 1:A1 }
+            //     w2 = { 0:a0, 1:a1 }
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
 
-            // push all four items to the primary stream. this should produce four items.
-            // w1 = { 0:X0, 1:X1 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1 }
+            // push all four items to the primary stream; this should produce four items
+            // w1 = { 0:A0, 1:A1 }
+            // w2 = { 0:a0, 1:a1 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
+            //     w2 = { 0:a0, 1:a1 }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "B" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)", "2:X2+null (ts: 0)", "3:X3+null (ts: 0)");
+            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)", "3:B3+null (ts: 0)");
 
-            // push all items to the other stream. this should produce six items.
-            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all items to the other stream; this should produce six items
+            // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
+            // w2 = { 0:a0, 1:a1 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
+            //     w2 = { 0:a0, 1:a1, 0:b0, 0:b0, 1:b1, 2:b2, 3:b3 }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 0)", "1:X1+YY1 (ts: 0)", "2:X2+YY2 (ts: 0)", "3:X3+YY3 (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)", "3:B3+b3 (ts: 0)");
 
-            // push all four items to the primary stream. this should produce six items.
-            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all four items to the primary stream; this should produce six items
+            // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
+            // w2 = { 0:a0, 1:a1, 0:b0, 0:b0, 1:b1, 2:b2, 3:b3 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3, 0:C0, 1:C1, 2:C2, 3:C3 }
+            //     w2 = { 0:a0, 1:a1, 0:b0, 0:b0, 1:b1, 2:b2, 3:b3 }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 0)", "0:XX0+YY0 (ts: 0)", "1:XX1+Y1 (ts: 0)", "1:XX1+YY1 (ts: 0)", "2:XX2+YY2 (ts: 0)", "3:XX3+YY3 (ts: 0)");
+            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
 
-            // push two items to the other stream. this should produce six item.
-            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+            // push two items to the other stream; this should produce six items
+            // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3, 0:C0, 1:C1, 2:C2, 3:C3 }
+            // w2 = { 0:a0, 1:a1, 0:b0, 0:b0, 1:b1, 2:b2, 3:b3 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3, 0:C0, 1:C1, 2:C2, 3:C3 }
+            //     w2 = { 0:a0, 1:a1, 0:b0, 0:b0, 1:b1, 2:b2, 3:b3, 0:c0, 1:c1 }
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "c" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:X0+YYY0 (ts: 0)", "0:X0+YYY0 (ts: 0)", "0:XX0+YYY0 (ts: 0)", "1:X1+YYY1 (ts: 0)", "1:X1+YYY1 (ts: 0)", "1:XX1+YYY1 (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+c0 (ts: 0)", "0:B0+c0 (ts: 0)", "0:C0+c0 (ts: 0)", "1:A1+c1 (ts: 0)", "1:B1+c1 (ts: 0)", "1:C1+c1 (ts: 0)");
         }
     }
 
@@ -292,166 +292,472 @@ public class KStreamKStreamJoinTest {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
             long time = 0L;
 
-            // push two items to the primary stream. the other window is empty. this should produce no items.
+            // push two items to the primary stream; the other window is empty; this should produce no items
             // w1 = {}
             // w2 = {}
-            // --> w1 = { 0:X0, 1:X1 }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             //     w2 = {}
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time));
             }
             processor.checkAndClearProcessResult();
 
-            // push two items to the other stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
+            // push two items to the other stream; this should produce two items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // w2 = {}
-            // --> w1 = { 0:X0, 1:X1 }
-            //     w2 = { 0:Y0, 1:Y1 }
-
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0) }
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i], time));
             }
-            processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)");
-
-            // clear logically
+            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
+
+            // push four items to the primary stream with larger and increasing timestamp; this should produce no items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0) }
             time = 1000L;
             for (int i = 0; i < expectedKeys.length; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "B" + expectedKeys[i], time + i));
             }
             processor.checkAndClearProcessResult();
 
-            // gradually expires items in w1
-            // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
+            // push four items to the other stream with fixed larger timestamp; this should produce four items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100) }
             time += 100L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1100)", "1:X1+YY1 (ts: 1100)", "2:X2+YY2 (ts: 1100)", "3:X3+YY3 (ts: 1100)");
-
+            processor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)");
+
+            // push four items to the other stream with incremented timestamp; this should produce three items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "c" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("1:X1+YY1 (ts: 1101)", "2:X2+YY2 (ts: 1101)", "3:X3+YY3 (ts: 1101)");
-
+            processor.checkAndClearProcessResult("1:B1+c1 (ts: 1101)", "2:B2+c2 (ts: 1101)", "3:B3+c3 (ts: 1101)");
+
+            // push four items to the other stream with incremented timestamp; this should produce two items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //        0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //            0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "d" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("2:X2+YY2 (ts: 1102)", "3:X3+YY3 (ts: 1102)");
-
+            processor.checkAndClearProcessResult("2:B2+d2 (ts: 1102)", "3:B3+d3 (ts: 1102)");
+
+            // push four items to the other stream with incremented timestamp; this should produce one item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //        0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //        0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //            0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //            0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "e" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("3:X3+YY3 (ts: 1103)");
-
+            processor.checkAndClearProcessResult("3:B3+e3 (ts: 1103)");
+
+            // push four items to the other stream with incremented timestamp; this should produce no items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //        0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //        0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //        0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //            0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //            0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //            0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "f" + expectedKey, time));
             }
             processor.checkAndClearProcessResult();
 
-            // go back to the time before expiration
+            // push four items to the other stream with timestamp before the window bound; this should produce no items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //        0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //        0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //        0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //        0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //            0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //            0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //            0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //            0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899) }
             time = 1000L - 100L - 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "g" + expectedKey, time));
             }
             processor.checkAndClearProcessResult();
 
+            // push four items to the other stream with with incremented timestamp; this should produce one item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //        0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //        0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //        0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //        0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //        0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //            0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //            0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //            0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //            0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899),
+            //            0:h0 (ts: 900), 1:h1 (ts: 900), 2:h2 (ts: 900), 3:h3 (ts: 900) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "h" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 900)");
-
+            processor.checkAndClearProcessResult("0:B0+h0 (ts: 1000)");
+
+            // push four items to the other stream with with incremented timestamp; this should produce two items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //        0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //        0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //        0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //        0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //        0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899),
+            //        0:h0 (ts: 900), 1:h1 (ts: 900), 2:h2 (ts: 900), 3:h3 (ts: 900) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //            0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //            0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //            0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //            0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899),
+            //            0:h0 (ts: 900), 1:h1 (ts: 900), 2:h2 (ts: 900), 3:h3 (ts: 900),
+            //            0:i0 (ts: 901), 1:i1 (ts: 901), 2:i2 (ts: 901), 3:i3 (ts: 901) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "i" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 901)", "1:X1+YY1 (ts: 901)");
-
+            processor.checkAndClearProcessResult("0:B0+i0 (ts: 1000)", "1:B1+i1 (ts: 1001)");
+
+            // push four items to the other stream with with incremented timestamp; this should produce three items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //        0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //        0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //        0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //        0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //        0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899),
+            //        0:h0 (ts: 900), 1:h1 (ts: 900), 2:h2 (ts: 900), 3:h3 (ts: 900),
+            //        0:i0 (ts: 901), 1:i1 (ts: 901), 2:i2 (ts: 901), 3:i3 (ts: 901) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //            0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //            0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //            0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //            0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899),
+            //            0:h0 (ts: 900), 1:h1 (ts: 900), 2:h2 (ts: 900), 3:h3 (ts: 900),
+            //            0:i0 (ts: 901), 1:i1 (ts: 901), 2:i2 (ts: 901), 3:i3 (ts: 901),
+            //            0:j0 (ts: 902), 1:j1 (ts: 902), 2:j2 (ts: 902), 3:j3 (ts: 902) }
             time += 1;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "j" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 902)", "1:X1+YY1 (ts: 902)", "2:X2+YY2 (ts: 902)");
-
+            processor.checkAndClearProcessResult("0:B0+j0 (ts: 1000)", "1:B1+j1 (ts: 1001)", "2:B2+j2 (ts: 1002)");
+
+            // push four items to the other stream with with incremented timestamp; this should produce four items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //        0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003)  }
+            // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //        0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //        0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //        0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //        0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //        0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //        0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899),
+            //        0:h0 (ts: 900), 1:h1 (ts: 900), 2:h2 (ts: 900), 3:h3 (ts: 900),
+            //        0:i0 (ts: 901), 1:i1 (ts: 901), 2:i2 (ts: 901), 3:i3 (ts: 901),
+            //        0:j0 (ts: 902), 1:j1 (ts: 902), 2:j2 (ts: 902), 3:j3 (ts: 902) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+            //            0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0),
+            //            0:b0 (ts: 1100), 1:b1 (ts: 1100), 2:b2 (ts: 1100), 3:b3 (ts: 1100),
+            //            0:c0 (ts: 1101), 1:c1 (ts: 1101), 2:c2 (ts: 1101), 3:c3 (ts: 1101),
+            //            0:d0 (ts: 1102), 1:d1 (ts: 1102), 2:d2 (ts: 1102), 3:d3 (ts: 1102),
+            //            0:e0 (ts: 1103), 1:e1 (ts: 1103), 2:e2 (ts: 1103), 3:e3 (ts: 1103),
+            //            0:f0 (ts: 1104), 1:f1 (ts: 1104), 2:f2 (ts: 1104), 3:f3 (ts: 1104),
+            //            0:g0 (ts: 899), 1:g1 (ts: 899), 2:g2 (ts: 899), 3:g3 (ts: 899),
+            //            0:h0 (ts: 900), 1:h1 (ts: 900), 2:h2 (ts: 900), 3:h3 (ts: 900),
+            //            0:i0 (ts: 901), 1:i1 (ts: 901), 2:i2 (ts: 901), 3:i3 (ts: 901),
+            //            0:j0 (ts: 902), 1:j1 (ts: 902), 2:j2 (ts: 902), 3:j3 (ts: 902) }
+            //            0:k0 (ts: 903), 1:k1 (ts: 903), 2:k2 (ts: 903), 3:k3 (ts: 903) }
             time += 1;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "k" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 903)", "1:X1+YY1 (ts: 903)", "2:X2+YY2 (ts: 903)", "3:X3+YY3 (ts: 903)");
+            processor.checkAndClearProcessResult("0:B0+k0 (ts: 1000)", "1:B1+k1 (ts: 1001)", "2:B2+k2 (ts: 1002)", "3:B3+k3 (ts: 1003)");
 
-            // clear (logically)
+            // advance time to not join with existing data
+            // we omit above exiting data, even if it's still in the window
+            //
+            // push four items with increasing timestamps to the other stream. the primary window is empty; this should produce no items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = {}
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time = 2000L;
             for (int i = 0; i < expectedKeys.length; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "l" + expectedKeys[i], time + i));
             }
             processor.checkAndClearProcessResult();
 
-            // gradually expires items in w2
-            // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+            // push four items with larger timestamps to the primary stream; this should produce four items
+            // w1 = {}
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time = 2000L + 100L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 2100)", "1:XX1+Y1 (ts: 2100)", "2:XX2+Y2 (ts: 2100)", "3:XX3+Y3 (ts: 2100)");
-
+            processor.checkAndClearProcessResult("0:C0+l0 (ts: 2100)", "1:C1+l1 (ts: 2100)", "2:C2+l2 (ts: 2100)", "3:C3+l3 (ts: 2100)");
+
+            // push four items with increase timestamps to the primary stream; this should produce three items
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "D" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("1:XX1+Y1 (ts: 2101)", "2:XX2+Y2 (ts: 2101)", "3:XX3+Y3 (ts: 2101)");
-
+            processor.checkAndClearProcessResult("1:D1+l1 (ts: 2101)", "2:D2+l2 (ts: 2101)", "3:D3+l3 (ts: 2101)");
+
+            // push four items with increase timestamps to the primary stream; this should produce two items
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //        0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //            0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "E" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("2:XX2+Y2 (ts: 2102)", "3:XX3+Y3 (ts: 2102)");
-
+            processor.checkAndClearProcessResult("2:E2+l2 (ts: 2102)", "3:E3+l3 (ts: 2102)");
+
+            // push four items with increase timestamps to the primary stream; this should produce one item
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //        0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //        0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //            0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //            0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "F" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("3:XX3+Y3 (ts: 2103)");
-
+            processor.checkAndClearProcessResult("3:F3+l3 (ts: 2103)");
+
+            // push four items with increase timestamps (now out of window) to the primary stream; this should produce no items
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //        0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //        0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //        0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //            0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //            0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //            0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "G" + expectedKey, time));
             }
             processor.checkAndClearProcessResult();
 
-            // go back to the time before expiration
+            // push four items with smaller timestamps (before window) to the primary stream; this should produce no items
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //        0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //        0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //        0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //        0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //            0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //            0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //            0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //            0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time = 2000L - 100L - 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "H" + expectedKey, time));
             }
             processor.checkAndClearProcessResult();
 
+            // push four items with increased timestamps to the primary stream; this should produce one item
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //        0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //        0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //        0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //        0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //        0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //            0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //            0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //            0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //            0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899),
+            //            0:I0 (ts: 1900), 1:I1 (ts: 1900), 2:I2 (ts: 1900), 3:I3 (ts: 1900) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "I" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 1900)");
-
+            processor.checkAndClearProcessResult("0:I0+l0 (ts: 2000)");
+
+            // push four items with increased timestamps to the primary stream; this should produce two items
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //        0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //        0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //        0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //        0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //        0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899),
+            //        0:I0 (ts: 1900), 1:I1 (ts: 1900), 2:I2 (ts: 1900), 3:I3 (ts: 1900) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //            0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //            0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //            0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //            0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899),
+            //            0:I0 (ts: 1900), 1:I1 (ts: 1900), 2:I2 (ts: 1900), 3:I3 (ts: 1900),
+            //            0:J0 (ts: 1901), 1:J1 (ts: 1901), 2:J2 (ts: 1901), 3:J3 (ts: 1901) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "J" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 1901)", "1:XX1+Y1 (ts: 1901)");
-
+            processor.checkAndClearProcessResult("0:J0+l0 (ts: 2000)", "1:J1+l1 (ts: 2001)");
+
+            // push four items with increased timestamps to the primary stream; this should produce three items
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //        0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //        0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //        0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //        0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //        0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899),
+            //        0:I0 (ts: 1900), 1:I1 (ts: 1900), 2:I2 (ts: 1900), 3:I3 (ts: 1900),
+            //        0:J0 (ts: 1901), 1:J1 (ts: 1901), 2:J2 (ts: 1901), 3:J3 (ts: 1901) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //            0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //            0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //            0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //            0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899),
+            //            0:I0 (ts: 1900), 1:I1 (ts: 1900), 2:I2 (ts: 1900), 3:I3 (ts: 1900),
+            //            0:J0 (ts: 1901), 1:J1 (ts: 1901), 2:J2 (ts: 1901), 3:J3 (ts: 1901),
+            //            0:K0 (ts: 1902), 1:K1 (ts: 1902), 2:K2 (ts: 1902), 3:K3 (ts: 1902) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "K" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 1902)", "1:XX1+Y1 (ts: 1902)", "2:XX2+Y2 (ts: 1902)");
-
+            processor.checkAndClearProcessResult("0:K0+l0 (ts: 2000)", "1:K1+l1 (ts: 2001)", "2:K2+l2 (ts: 2002)");
+
+            // push four items with increased timestamps to the primary stream; this should produce four items
+            // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //        0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //        0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //        0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //        0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //        0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899),
+            //        0:I0 (ts: 1900), 1:I1 (ts: 1900), 2:I2 (ts: 1900), 3:I3 (ts: 1900),
+            //        0:J0 (ts: 1901), 1:J1 (ts: 1901), 2:J2 (ts: 1901), 3:J3 (ts: 1901) }
+            //        0:K0 (ts: 1902), 1:K1 (ts: 1902), 2:K2 (ts: 1902), 3:K3 (ts: 1902) }
+            // w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
+            // --> w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
+            //            0:D0 (ts: 2101), 1:D1 (ts: 2101), 2:D2 (ts: 2101), 3:D3 (ts: 2101),
+            //            0:E0 (ts: 2102), 1:E1 (ts: 2102), 2:E2 (ts: 2102), 3:E3 (ts: 2102),
+            //            0:F0 (ts: 2103), 1:F1 (ts: 2103), 2:F2 (ts: 2103), 3:F3 (ts: 2103),
+            //            0:G0 (ts: 2104), 1:G1 (ts: 2104), 2:G2 (ts: 2104), 3:G3 (ts: 2104),
+            //            0:H0 (ts: 1899), 1:H1 (ts: 1899), 2:H2 (ts: 1899), 3:H3 (ts: 1899),
+            //            0:I0 (ts: 1900), 1:I1 (ts: 1900), 2:I2 (ts: 1900), 3:I3 (ts: 1900),
+            //            0:J0 (ts: 1901), 1:J1 (ts: 1901), 2:J2 (ts: 1901), 3:J3 (ts: 1901),
+            //            0:K0 (ts: 1902), 1:K1 (ts: 1902), 2:K2 (ts: 1902), 3:K3 (ts: 1902),
+            //            0:L0 (ts: 1903), 1:L1 (ts: 1903), 2:L2 (ts: 1903), 3:L3 (ts: 1903) }
+            //     w2 = { 0:l0 (ts: 2000), 1:l1 (ts: 2001), 2:l2 (ts: 2002), 3:l3 (ts: 2003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "L" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 1903)", "1:XX1+Y1 (ts: 1903)", "2:XX2+Y2 (ts: 1903)", "3:XX3+Y3 (ts: 1903)");
+            processor.checkAndClearProcessResult("0:L0+l0 (ts: 2000)", "1:L1+l1 (ts: 2001)", "2:L2+l2 (ts: 2002)", "3:L3+l3 (ts: 2003)");
         }
     }
 
@@ -487,68 +793,204 @@ public class KStreamKStreamJoinTest {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
             long time = 1000L;
 
+            // push four items with increasing timestamps to the primary stream; the other window is empty; this should produce no items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = {}
             for (int i = 0; i < expectedKeys.length; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time + i));
             }
             processor.checkAndClearProcessResult();
 
+            // push four items smaller timestamps (out of window) to the secondary stream; this should produce no items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) }
             time = 1000L - 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "a" + expectedKey, time));
             }
             processor.checkAndClearProcessResult();
 
+            // push four items with increased timestamps to the secondary stream; this should produce one item
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1000)");
-
+            processor.checkAndClearProcessResult("0:A0+b0 (ts: 1000)");
+
+            // push four items with increased timestamps to the secondary stream; this should produce two items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "c" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1001)", "1:X1+YY1 (ts: 1001)");
-
+            processor.checkAndClearProcessResult("0:A0+c0 (ts: 1001)", "1:A1+c1 (ts: 1001)");
+
+            // push four items with increased timestamps to the secondary stream; this should produce three items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "d" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1002)", "1:X1+YY1 (ts: 1002)", "2:X2+YY2 (ts: 1002)");
-
+            processor.checkAndClearProcessResult("0:A0+d0 (ts: 1002)", "1:A1+d1 (ts: 1002)", "2:A2+d2 (ts: 1002)");
+
+            // push four items with increased timestamps to the secondary stream; this should produce four items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //        0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //            0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "e" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1003)", "1:X1+YY1 (ts: 1003)", "2:X2+YY2 (ts: 1003)", "3:X3+YY3 (ts: 1003)");
-
+            processor.checkAndClearProcessResult("0:A0+e0 (ts: 1003)", "1:A1+e1 (ts: 1003)", "2:A2+e2 (ts: 1003)", "3:A3+e3 (ts: 1003)");
+
+            // push four items with larger timestamps to the secondary stream; this should produce four items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //        0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //        0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //            0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //            0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100) }
             time = 1000 + 100L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "f" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1100)", "1:X1+YY1 (ts: 1100)", "2:X2+YY2 (ts: 1100)", "3:X3+YY3 (ts: 1100)");
-
+            processor.checkAndClearProcessResult("0:A0+f0 (ts: 1100)", "1:A1+f1 (ts: 1100)", "2:A2+f2 (ts: 1100)", "3:A3+f3 (ts: 1100)");
+
+            // push four items with increased timestamps to the secondary stream; this should produce three items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //        0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //        0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //        0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //            0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //            0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100),
+            //            0:g0 (ts: 1101), 1:g1 (ts: 1101), 2:g2 (ts: 1101), 3:g3 (ts: 1101) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "g" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("1:X1+YY1 (ts: 1101)", "2:X2+YY2 (ts: 1101)", "3:X3+YY3 (ts: 1101)");
-
+            processor.checkAndClearProcessResult("1:A1+g1 (ts: 1101)", "2:A2+g2 (ts: 1101)", "3:A3+g3 (ts: 1101)");
+
+            // push four items with increased timestamps to the secondary stream; this should produce two items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //        0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //        0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //        0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100),
+            //        0:g0 (ts: 1101), 1:g1 (ts: 1101), 2:g2 (ts: 1101), 3:g3 (ts: 1101) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //            0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //            0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100),
+            //            0:g0 (ts: 1101), 1:g1 (ts: 1101), 2:g2 (ts: 1101), 3:g3 (ts: 1101),
+            //            0:h0 (ts: 1102), 1:h1 (ts: 1102), 2:h2 (ts: 1102), 3:h3 (ts: 1102) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "h" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("2:X2+YY2 (ts: 1102)", "3:X3+YY3 (ts: 1102)");
-
+            processor.checkAndClearProcessResult("2:A2+h2 (ts: 1102)", "3:A3+h3 (ts: 1102)");
+
+            // push four items with increased timestamps to the secondary stream; this should produce one item
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //        0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //        0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //        0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100),
+            //        0:g0 (ts: 1101), 1:g1 (ts: 1101), 2:g2 (ts: 1101), 3:g3 (ts: 1101),
+            //        0:h0 (ts: 1102), 1:h1 (ts: 1102), 2:h2 (ts: 1102), 3:h3 (ts: 1102) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //            0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //            0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100),
+            //            0:g0 (ts: 1101), 1:g1 (ts: 1101), 2:g2 (ts: 1101), 3:g3 (ts: 1101),
+            //            0:h0 (ts: 1102), 1:h1 (ts: 1102), 2:h2 (ts: 1102), 3:h3 (ts: 1102),
+            //            0:i0 (ts: 1103), 1:i1 (ts: 1103), 2:i2 (ts: 1103), 3:i3 (ts: 1103) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "i" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("3:X3+YY3 (ts: 1103)");
-
+            processor.checkAndClearProcessResult("3:A3+i3 (ts: 1103)");
+
+            // push four items with increased timestamps (no out of window) to the secondary stream; this should produce no items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //        0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //        0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //        0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100),
+            //        0:g0 (ts: 1101), 1:g1 (ts: 1101), 2:g2 (ts: 1101), 3:g3 (ts: 1101),
+            //        0:h0 (ts: 1102), 1:h1 (ts: 1102), 2:h2 (ts: 1102), 3:h3 (ts: 1102),
+            //        0:i0 (ts: 1103), 1:i1 (ts: 1103), 2:i2 (ts: 1103), 3:i3 (ts: 1103) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
+            //            0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
+            //            0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100),
+            //            0:g0 (ts: 1101), 1:g1 (ts: 1101), 2:g2 (ts: 1101), 3:g3 (ts: 1101),
+            //            0:h0 (ts: 1102), 1:h1 (ts: 1102), 2:h2 (ts: 1102), 3:h3 (ts: 1102),
+            //            0:i0 (ts: 1103), 1:i1 (ts: 1103), 2:i2 (ts: 1103), 3:i3 (ts: 1103),
+            //            0:j0 (ts: 1104), 1:j1 (ts: 1104), 2:j2 (ts: 1104), 3:j3 (ts: 1104) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "j" + expectedKey, time));
             }
             processor.checkAndClearProcessResult();
         }
@@ -585,68 +1027,204 @@ public class KStreamKStreamJoinTest {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
             long time = 1000L;
 
+            // push four items with increasing timestamps to the primary stream; the other window is empty; this should produce no items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = {}
             for (int i = 0; i < expectedKeys.length; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time + i));
             }
             processor.checkAndClearProcessResult();
 
+            // push four items with smaller timestamps (before the window) to the other stream; this should produce no items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899) }
             time = 1000L - 100L - 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "a" + expectedKey, time));
             }
             processor.checkAndClearProcessResult();
 
+            // push four items with increased timestamp to the other stream; this should produce one item
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 900)");
-
+            processor.checkAndClearProcessResult("0:A0+b0 (ts: 1000)");
+
+            // push four items with increased timestamp to the other stream; this should produce two items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //        0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //            0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "c" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 901)", "1:X1+YY1 (ts: 901)");
-
+            processor.checkAndClearProcessResult("0:A0+c0 (ts: 1000)", "1:A1+c1 (ts: 1001)");
+
+            // push four items with increased timestamp to the other stream; this should produce three items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //        0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //        0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //            0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //            0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "d" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 902)", "1:X1+YY1 (ts: 902)", "2:X2+YY2 (ts: 902)");
-
+            processor.checkAndClearProcessResult("0:A0+d0 (ts: 1000)", "1:A1+d1 (ts: 1001)", "2:A2+d2 (ts: 1002)");
+
+            // push four items with increased timestamp to the other stream; this should produce four items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //        0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //        0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //        0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //            0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //            0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //            0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "e" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 903)", "1:X1+YY1 (ts: 903)", "2:X2+YY2 (ts: 903)", "3:X3+YY3 (ts: 903)");
-
+            processor.checkAndClearProcessResult("0:A0+e0 (ts: 1000)", "1:A1+e1 (ts: 1001)", "2:A2+e2 (ts: 1002)", "3:A3+e3 (ts: 1003)");
+
+            // push four items with larger timestamp to the other stream; this should produce four items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //        0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //        0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //        0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //        0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //            0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //            0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //            0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //            0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000) }
             time = 1000L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "f" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1000)", "1:X1+YY1 (ts: 1000)", "2:X2+YY2 (ts: 1000)", "3:X3+YY3 (ts: 1000)");
-
+            processor.checkAndClearProcessResult("0:A0+f0 (ts: 1000)", "1:A1+f1 (ts: 1001)", "2:A2+f2 (ts: 1002)", "3:A3+f3 (ts: 1003)");
+
+            // push four items with increase timestamp to the other stream; this should produce three items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //        0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //        0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //        0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //        0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //        0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //            0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //            0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //            0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //            0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000),
+            //            0:g0 (ts: 1001), 1:g1 (ts: 1001), 2:g2 (ts: 1001), 3:g3 (ts: 1001) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "g" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("1:X1+YY1 (ts: 1001)", "2:X2+YY2 (ts: 1001)", "3:X3+YY3 (ts: 1001)");
-
+            processor.checkAndClearProcessResult("1:A1+g1 (ts: 1001)", "2:A2+g2 (ts: 1002)", "3:A3+g3 (ts: 1003)");
+
+            // push four items with increase timestamp to the other stream; this should produce two items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //        0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //        0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //        0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //        0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //        0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000),
+            //        0:g0 (ts: 1001), 1:g1 (ts: 1001), 2:g2 (ts: 1001), 3:g3 (ts: 1001) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //            0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //            0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //            0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //            0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000),
+            //            0:g0 (ts: 1001), 1:g1 (ts: 1001), 2:g2 (ts: 1001), 3:g3 (ts: 1001),
+            //            0:h0 (ts: 1002), 1:h1 (ts: 1002), 2:h2 (ts: 1002), 3:h3 (ts: 1002) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "h" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("2:X2+YY2 (ts: 1002)", "3:X3+YY3 (ts: 1002)");
-
+            processor.checkAndClearProcessResult("2:A2+h2 (ts: 1002)", "3:A3+h3 (ts: 1003)");
+
+            // push four items with increase timestamp to the other stream; this should produce one item
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //        0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //        0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //        0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //        0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //        0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000),
+            //        0:g0 (ts: 1001), 1:g1 (ts: 1001), 2:g2 (ts: 1001), 3:g3 (ts: 1001),
+            //        0:h0 (ts: 1002), 1:h1 (ts: 1002), 2:h2 (ts: 1002), 3:h3 (ts: 1002) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //            0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //            0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //            0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //            0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000),
+            //            0:g0 (ts: 1001), 1:g1 (ts: 1001), 2:g2 (ts: 1001), 3:g3 (ts: 1001),
+            //            0:h0 (ts: 1002), 1:h1 (ts: 1002), 2:h2 (ts: 1002), 3:h3 (ts: 1002),
+            //            0:i0 (ts: 1003), 1:i1 (ts: 1003), 2:i2 (ts: 1003), 3:i3 (ts: 1003) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "i" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("3:X3+YY3 (ts: 1003)");
-
+            processor.checkAndClearProcessResult("3:A3+i3 (ts: 1003)");
+
+            // push four items with increase timestamp (no out of window) to the other stream; this should produce no items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            // w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //        0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //        0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //        0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //        0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //        0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000),
+            //        0:g0 (ts: 1001), 1:g1 (ts: 1001), 2:g2 (ts: 1001), 3:g3 (ts: 1001),
+            //        0:h0 (ts: 1002), 1:h1 (ts: 1002), 2:h2 (ts: 1002), 3:h3 (ts: 1002),
+            //        0:i0 (ts: 1003), 1:i1 (ts: 1003), 2:i2 (ts: 1003), 3:i3 (ts: 1003) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 899), 1:a1 (ts: 899), 2:a2 (ts: 899), 3:a3 (ts: 899),
+            //            0:b0 (ts: 900), 1:b1 (ts: 900), 2:b2 (ts: 900), 3:b3 (ts: 900),
+            //            0:c0 (ts: 901), 1:c1 (ts: 901), 2:c2 (ts: 901), 3:c3 (ts: 901),
+            //            0:d0 (ts: 902), 1:d1 (ts: 902), 2:d2 (ts: 902), 3:d3 (ts: 902),
+            //            0:e0 (ts: 903), 1:e1 (ts: 903), 2:e2 (ts: 903), 3:e3 (ts: 903),
+            //            0:f0 (ts: 1000), 1:f1 (ts: 1000), 2:f2 (ts: 1000), 3:f3 (ts: 1000),
+            //            0:g0 (ts: 1001), 1:g1 (ts: 1001), 2:g2 (ts: 1001), 3:g3 (ts: 1001),
+            //            0:h0 (ts: 1002), 1:h1 (ts: 1002), 2:h2 (ts: 1002), 3:h3 (ts: 1002),
+            //            0:i0 (ts: 1003), 1:i1 (ts: 1003), 2:i2 (ts: 1003), 3:i3 (ts: 1003),
+            //            0:j0 (ts: 1004), 1:j1 (ts: 1004), 2:j2 (ts: 1004), 3:j3 (ts: 1004) }
             time += 1L;
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "j" + expectedKey, time));
             }
             processor.checkAndClearProcessResult();
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 863b790..c3d62e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -80,55 +80,55 @@ public class KStreamKStreamLeftJoinTest {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-            // push two items to the primary stream. the other window is empty
+            // push two items to the primary stream; the other window is empty
             // w1 {}
             // w2 {}
-            // --> w1 = { 0:X0, 1:X1 }
+            // --> w1 = { 0:A0, 1:A1 }
             // --> w2 = {}
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
 
-            // push two items to the other stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
+            // push two items to the other stream; this should produce two items
+            // w1 = { 0:A0, 1:A1 }
             // w2 {}
-            // --> w1 = { 0:X0, 1:X1 }
-            // --> w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:A0, 1:A1 }
+            // --> w2 = { 0:a0, 1:a1 }
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
 
-            // push three items to the primary stream. this should produce four items.
-            // w1 = { 0:X0, 1:X1 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-            // --> w2 = { 0:Y0, 1:Y1 }
+            // push three items to the primary stream; this should produce four items
+            // w1 = { 0:A0, 1:A1 }
+            // w2 = { 0:a0, 1:a1 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
+            // --> w2 = { 0:a0, 1:a1 }
             for (int i = 0; i < 3; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "B" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)", "2:X2+null (ts: 0)");
+            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)");
 
-            // push all items to the other stream. this should produce 5 items
-            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-            // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+            // push all items to the other stream; this should produce five items
+            // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
+            // w2 = { 0:a0, 1:a1 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
+            // --> w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 0)", "1:X1+YY1 (ts: 0)", "2:X2+YY2 (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)");
 
-            // push all four items to the primary stream. this should produce six items.
-            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+            // push all four items to the primary stream; this should produce six items
+            // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
+            // w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
+            // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 0:C0, 1:C1, 2:C2, 3:C3 }
+            // --> w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 0)", "0:XX0+YY0 (ts: 0)", "1:XX1+Y1 (ts: 0)", "1:XX1+YY1 (ts: 0)", "2:XX2+YY2 (ts: 0)", "3:XX3+YY3 (ts: 0)");
+            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
         }
     }
 
@@ -159,107 +159,284 @@ public class KStreamKStreamLeftJoinTest {
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
-            long time = 0L;
+            final long time = 0L;
 
-            // push two items to the primary stream. the other window is empty. this should produce two items
+            // push two items to the primary stream; the other window is empty; this should produce two left-join items
             // w1 = {}
             // w2 = {}
-            // --> w1 = { 0:X0, 1:X1 }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // --> w2 = {}
             for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time));
             }
-            processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 0)");
+            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
 
-            // push two items to the other stream. this should produce no items.
-            // w1 = { 0:X0, 1:X1 }
+            // push four items to the other stream; this should produce two full-join items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // w2 = {}
-            // --> w1 = { 0:X0, 1:X1 }
-            // --> w2 = { 0:Y0, 1:Y1 }
-            for (int i = 0; i < 2; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
-            }
-            processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)");
-
-            // clear logically
-            time = 1000L;
-
-            // push all items to the other stream. this should produce no items.
-            // w1 = {}
-            // w2 = {}
-            // --> w1 = {}
-            // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
-            }
-            processor.checkAndClearProcessResult();
-
-            // gradually expire items in window 2.
-            // w1 = {}
-            // w2 = {}
-            // --> w1 = {}
-            // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
-            time = 1000L + 100L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 1100)", "1:XX1+Y1 (ts: 1100)", "2:XX2+Y2 (ts: 1100)", "3:XX3+Y3 (ts: 1100)");
-
-            time += 1L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+null (ts: 1101)", "1:XX1+Y1 (ts: 1101)", "2:XX2+Y2 (ts: 1101)", "3:XX3+Y3 (ts: 1101)");
-
-            time += 1L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+null (ts: 1102)", "1:XX1+null (ts: 1102)", "2:XX2+Y2 (ts: 1102)", "3:XX3+Y3 (ts: 1102)");
-
-            time += 1L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+null (ts: 1103)", "1:XX1+null (ts: 1103)", "2:XX2+null (ts: 1103)", "3:XX3+Y3 (ts: 1103)");
-
-            time += 1L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+null (ts: 1104)", "1:XX1+null (ts: 1104)", "2:XX2+null (ts: 1104)", "3:XX3+null (ts: 1104)");
-
-            // go back to the time before expiration
-
-            time = 1000L - 100L - 1L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+null (ts: 899)", "1:XX1+null (ts: 899)", "2:XX2+null (ts: 899)", "3:XX3+null (ts: 899)");
-
-            time += 1L;
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
+            // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0) }
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "a" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 900)", "1:XX1+null (ts: 900)", "2:XX2+null (ts: 900)", "3:XX3+null (ts: 900)");
+            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
 
-            time += 1L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 901)", "1:XX1+Y1 (ts: 901)", "2:XX2+null (ts: 901)", "3:XX3+null (ts: 901)");
+            testUpperWindowBound(expectedKeys, driver, processor);
+            testLowerWindowBound(expectedKeys, driver, processor);
+        }
+    }
 
-            time += 1L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 902)", "1:XX1+Y1 (ts: 902)", "2:XX2+Y2 (ts: 902)", "3:XX3+null (ts: 902)");
+    private void testUpperWindowBound(final int[] expectedKeys,
+                                      final TopologyTestDriver driver,
+                                      final MockProcessor<Integer, String> processor) {
+        long time;
+
+        // push four items with larger and increasing timestamp (out of window) to the other stream; this should produce no items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time = 1000L;
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "b" + expectedKeys[i], time + i));
+        }
+        processor.checkAndClearProcessResult();
+
+        // push four items with larger timestamp to the primary stream; this should produce four full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time = 1000L + 100L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "B" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)");
+
+        // push four items with increased timestamp to the primary stream; this should produce one left-join and three full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time += 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:C0+null (ts: 1101)", "1:C1+b1 (ts: 1101)", "2:C2+b2 (ts: 1101)", "3:C3+b3 (ts: 1101)");
+
+        // push four items with increased timestamp to the primary stream; this should produce two left-join and two full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //            0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time += 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "D" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:D0+null (ts: 1102)", "1:D1+null (ts: 1102)", "2:D2+b2 (ts: 1102)", "3:D3+b3 (ts: 1102)");
+
+        // push four items with increased timestamp to the primary stream; this should produce three left-join and one full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //        0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //            0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //            0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time += 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "E" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:E0+null (ts: 1103)", "1:E1+null (ts: 1103)", "2:E2+null (ts: 1103)", "3:E3+b3 (ts: 1103)");
+
+        // push four items with increased timestamp to the primary stream; this should produce four left-join and no full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //        0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //        0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //            0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //            0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //            0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time += 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "F" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:F0+null (ts: 1104)", "1:F1+null (ts: 1104)", "2:F2+null (ts: 1104)", "3:F3+null (ts: 1104)");
+    }
 
-            time += 1L;
-            for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-            }
-            processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 903)", "1:XX1+Y1 (ts: 903)", "2:XX2+Y2 (ts: 903)", "3:XX3+Y3 (ts: 903)");
+    private void testLowerWindowBound(final int[] expectedKeys,
+                                      final TopologyTestDriver driver,
+                                      final MockProcessor<Integer, String> processor) {
+        long time;
+
+        // push four items with smaller timestamp (before the window) to the primary stream; this should produce four left-join and no full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //        0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //        0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //        0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //            0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //            0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //            0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //            0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time = 1000L - 100L - 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "G" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:G0+null (ts: 899)", "1:G1+null (ts: 899)", "2:G2+null (ts: 899)", "3:G3+null (ts: 899)");
+
+        // push four items with increase timestamp to the primary stream; this should produce three left-join and one full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //        0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //        0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //        0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //        0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //            0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //            0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //            0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //            0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
+        //            0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time += 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "H" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:H0+b0 (ts: 1000)", "1:H1+null (ts: 900)", "2:H2+null (ts: 900)", "3:H3+null (ts: 900)");
+
+        // push four items with increase timestamp to the primary stream; this should produce two left-join and two full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //        0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //        0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //        0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //        0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
+        //        0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //            0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //            0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //            0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //            0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
+        //            0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
+        //            0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time += 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "I" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:I0+b0 (ts: 1000)", "1:I1+b1 (ts: 1001)", "2:I2+null (ts: 901)", "3:I3+null (ts: 901)");
+
+        // push four items with increase timestamp to the primary stream; this should produce one left-join and three full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //        0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //        0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //        0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //        0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
+        //        0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
+        //        0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //            0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //            0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //            0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //            0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
+        //            0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
+        //            0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901),
+        //            0:J0 (ts: 902), 1:J1 (ts: 902), 2:J2 (ts: 902), 3:J3 (ts: 902) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time += 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "J" + expectedKey, time));
+        }
+        processor.checkAndClearProcessResult("0:J0+b0 (ts: 1000)", "1:J1+b1 (ts: 1001)", "2:J2+b2 (ts: 1002)", "3:J3+null (ts: 902)");
+
+        // push four items with increase timestamp to the primary stream; this should produce one left-join and three full-join items
+        // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //        0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //        0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //        0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //        0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //        0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
+        //        0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
+        //        0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901),
+        //        0:J0 (ts: 902), 1:J1 (ts: 902), 2:J2 (ts: 902), 3:J3 (ts: 902) }
+        // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //        0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
+        //            0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
+        //            0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
+        //            0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
+        //            0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
+        //            0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
+        //            0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
+        //            0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
+        //            0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901),
+        //            0:J0 (ts: 902), 1:J1 (ts: 902), 2:J2 (ts: 902), 3:J3 (ts: 902),
+        //            0:K0 (ts: 903), 1:K1 (ts: 903), 2:K2 (ts: 903), 3:K3 (ts: 903) }
+        // --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
+        //            0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
+        time += 1L;
+        for (final int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "K" + expectedKey, time));
         }
+        processor.checkAndClearProcessResult("0:K0+b0 (ts: 1000)", "1:K1+b1 (ts: 1001)", "2:K2+b2 (ts: 1002)", "3:K3+b3 (ts: 1003)");
     }
 }


Mime
View raw message