kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8450: Using KeyValueTimeStamp in MockProcessor (#6933)
Date Tue, 16 Jul 2019 14:16:28 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 00757cd  KAFKA-8450: Using KeyValueTimeStamp in MockProcessor (#6933)
00757cd is described below

commit 00757cd99f6d8bf2760c3708d307347f4cde65f6
Author: SuryaTeja Duggi <suryateja008@gmail.com>
AuthorDate: Tue Jul 16 19:46:15 2019 +0530

    KAFKA-8450: Using KeyValueTimeStamp in MockProcessor (#6933)
    
    This PR is to use KeyValueTimeStamp Object in MockProcessor Test file instead of String and change all the dependency files with broken test cases.
    
    Reviewers: Kamal Chandraprakash, Matthias J. Sax <mjsax@apache.org>,  Boyang Chen <boyang@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
---
 .../apache/kafka/streams/StreamsBuilderTest.java   |  12 +-
 .../kstream/internals/KGroupedStreamImplTest.java  |  26 +--
 .../kstream/internals/KStreamFlatMapTest.java      |   8 +-
 .../internals/KStreamFlatMapValuesTest.java        |  15 +-
 .../internals/KStreamGlobalKTableJoinTest.java     |  22 ++-
 .../internals/KStreamGlobalKTableLeftJoinTest.java |  39 ++++-
 .../streams/kstream/internals/KStreamImplTest.java |  36 +++-
 .../kstream/internals/KStreamKStreamJoinTest.java  | 187 +++++++++++++++------
 .../internals/KStreamKStreamLeftJoinTest.java      |  96 +++++++----
 .../kstream/internals/KStreamKTableJoinTest.java   |  22 ++-
 .../internals/KStreamKTableLeftJoinTest.java       |  34 +++-
 .../streams/kstream/internals/KStreamMapTest.java  |   8 +-
 .../kstream/internals/KStreamMapValuesTest.java    |  11 +-
 .../kstream/internals/KStreamSelectKeyTest.java    |   5 +-
 .../kstream/internals/KStreamTransformTest.java    |  26 +--
 .../internals/KStreamTransformValuesTest.java      |  19 ++-
 .../internals/KStreamWindowAggregateTest.java      | 137 ++++++++-------
 .../kstream/internals/KTableAggregateTest.java     |  62 +++----
 .../kstream/internals/KTableFilterTest.java        |  67 +++++---
 .../streams/kstream/internals/KTableImplTest.java  |  29 +++-
 .../internals/KTableKTableInnerJoinTest.java       |  75 +++++----
 .../internals/KTableKTableLeftJoinTest.java        |  97 ++++++-----
 .../internals/KTableKTableOuterJoinTest.java       | 121 ++++++-------
 .../kstream/internals/KTableMapKeysTest.java       |   9 +-
 .../kstream/internals/KTableMapValuesTest.java     |  28 ++-
 .../kstream/internals/KTableSourceTest.java        |  32 +++-
 .../internals/KTableTransformValuesTest.java       |  27 ++-
 .../java/org/apache/kafka/test/MockProcessor.java  |  15 +-
 28 files changed, 821 insertions(+), 444 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 68dd3ac..92b471c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -260,7 +260,8 @@ public class StreamsBuilderTest {
         }
 
         // no exception was thrown
-        assertEquals(Collections.singletonList("A:aa (ts: 0)"), processorSupplier.theCapturedProcessor().processed);
+        assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)),
+                 processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -281,8 +282,8 @@ public class StreamsBuilderTest {
             driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
         }
 
-        assertEquals(Collections.singletonList("A:aa (ts: 0)"), sourceProcessorSupplier.theCapturedProcessor().processed);
-        assertEquals(Collections.singletonList("A:aa (ts: 0)"), throughProcessorSupplier.theCapturedProcessor().processed);
+        assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed);
+        assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
     }
     
     @Test
@@ -307,7 +308,10 @@ public class StreamsBuilderTest {
             driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
         }
 
-        assertEquals(asList("A:aa (ts: 0)", "B:bb (ts: 0)", "C:cc (ts: 0)", "D:dd (ts: 0)"), processorSupplier.theCapturedProcessor().processed);
+        assertEquals(asList(new KeyValueTimestamp<>("A", "aa", 0),
+                new KeyValueTimestamp<>("B", "bb", 0),
+                new KeyValueTimestamp<>("C", "cc", 0),
+                new KeyValueTimestamp<>("D", "dd", 0)), processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index b19fd55..d8c3389 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -42,7 +43,6 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -576,18 +576,18 @@ public class KGroupedStreamImplTest {
             driver.pipeInput(recordFactory.create(TOPIC, "3", "B", 100L));
         }
         assertThat(supplier.theCapturedProcessor().processed, equalTo(Arrays.asList(
-            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
-            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
-            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),
-            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L),
-            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L),
-            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L),
-            MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L),
-            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L),
-            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L),
-            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L),
-            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L),
-            MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L)
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L),
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L),
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L),
+            new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L),
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L),
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L),
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L),
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L),
+            new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L)
         )));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index af8ec08..990eb48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -71,7 +72,12 @@ public class KStreamFlatMapTest {
 
         assertEquals(6, supplier.theCapturedProcessor().processed.size());
 
-        final String[] expected = {"10:V1 (ts: 0)", "20:V2 (ts: 0)", "21:V2 (ts: 0)", "30:V3 (ts: 0)", "31:V3 (ts: 0)", "32:V3 (ts: 0)"};
+        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>("10", "V1", 0),
+            new KeyValueTimestamp<>("20", "V2", 0),
+            new KeyValueTimestamp<>("21", "V2", 0),
+            new KeyValueTimestamp<>("30", "V3", 0),
+            new KeyValueTimestamp<>("31", "V3", 0),
+            new KeyValueTimestamp<>("32", "V3", 0)};
 
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 07437aa..f92d80d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -65,7 +66,10 @@ public class KStreamFlatMapValuesTest {
             }
         }
 
-        final String[] expected = {"0:v0 (ts: 0)", "0:V0 (ts: 0)", "1:v1 (ts: 0)", "1:V1 (ts: 0)", "2:v2 (ts: 0)", "2:V2 (ts: 0)", "3:v3 (ts: 0)", "3:V3 (ts: 0)"};
+        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(0, "v0", 0), new KeyValueTimestamp<>(0, "V0", 0),
+            new KeyValueTimestamp<>(1, "v1", 0), new KeyValueTimestamp<>(1, "V1", 0),
+            new KeyValueTimestamp<>(2, "v2", 0), new KeyValueTimestamp<>(2, "V2", 0),
+            new KeyValueTimestamp<>(3, "v3", 0), new KeyValueTimestamp<>(3, "V3", 0)};
 
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
@@ -97,7 +101,14 @@ public class KStreamFlatMapValuesTest {
             }
         }
 
-        final String[] expected = {"0:v0 (ts: 0)", "0:k0 (ts: 0)", "1:v1 (ts: 0)", "1:k1 (ts: 0)", "2:v2 (ts: 0)", "2:k2 (ts: 0)", "3:v3 (ts: 0)", "3:k3 (ts: 0)"};
+        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(0, "v0", 0),
+            new KeyValueTimestamp<>(0, "k0", 0),
+            new KeyValueTimestamp<>(1, "v1", 0),
+            new KeyValueTimestamp<>(1, "k1", 0),
+            new KeyValueTimestamp<>(2, "v2", 0),
+            new KeyValueTimestamp<>(2, "k2", 0),
+            new KeyValueTimestamp<>(3, "v3", 0),
+            new KeyValueTimestamp<>(3, "k3", 0)};
 
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index 802aa33..d0348df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -42,7 +43,7 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamGlobalKTableJoinTest {
-    private final static String[] EMPTY = new String[0];
+    private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
     private final String streamTopic = "streamTopic";
     private final String globalTableTopic = "globalTableTopic";
@@ -145,7 +146,8 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two items.
 
         pushToStream(4, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
 
         // push all items to the globalTable. this should not produce any item
 
@@ -155,7 +157,10 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce four items.
 
         pushToStream(4, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+YY0 (ts: 0)", "1:X1,FKey1+YY1 (ts: 1)", "2:X2,FKey2+YY2 (ts: 2)", "3:X3,FKey3+YY3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
+                new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
+                new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 3));
 
         // push all items to the globalTable. this should not produce any item
 
@@ -174,7 +179,8 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two items.
 
         pushToStream(4, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
 
     }
 
@@ -189,7 +195,10 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce four items.
 
         pushToStream(4, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)", "2:X2,FKey2+Y2 (ts: 2)", "3:X3,FKey3+Y3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+                new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
+                new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
 
         // push two items with null to the globalTable as deletes. this should not produce any item.
 
@@ -199,7 +208,8 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two items.
 
         pushToStream(4, "XX", true);
-        processor.checkAndClearProcessResult("2:XX2,FKey2+Y2 (ts: 2)", "3:XX3,FKey3+Y3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 2),
+                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index a6361da..ce4454e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -42,7 +43,7 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamGlobalKTableLeftJoinTest {
-    private final static String[] EMPTY = new String[0];
+    private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
     private final String streamTopic = "streamTopic";
     private final String globalTableTopic = "globalTableTopic";
@@ -126,7 +127,8 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push two items to the primary stream. the globalTable is empty
 
         pushToStream(2, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+null (ts: 0)", "1:X1,FKey1+null (ts: 1)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+null", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+null", 1));
     }
 
     @Test
@@ -135,7 +137,8 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push two items to the primary stream. the globalTable is empty
 
         pushToStream(2, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+null (ts: 0)", "1:X1,FKey1+null (ts: 1)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+null", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+null", 1));
 
         // push two items to the globalTable. this should not produce any item.
 
@@ -145,7 +148,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four items.
 
         pushToStream(4, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)", "2:X2,FKey2+null (ts: 2)", "3:X3,FKey3+null (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+                new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
+                new KeyValueTimestamp<>(3, "X3,FKey3+null", 3));
 
         // push all items to the globalTable. this should not produce any item
 
@@ -155,7 +161,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four items.
 
         pushToStream(4, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+YY0 (ts: 0)", "1:X1,FKey1+YY1 (ts: 1)", "2:X2,FKey2+YY2 (ts: 2)", "3:X3,FKey3+YY3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
+                new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
+                new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 3));
 
         // push all items to the globalTable. this should not produce any item
 
@@ -174,7 +183,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four items.
 
         pushToStream(4, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)", "2:X2,FKey2+null (ts: 2)", "3:X3,FKey3+null (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+                new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
+                new KeyValueTimestamp<>(3, "X3,FKey3+null", 3));
 
     }
 
@@ -189,7 +201,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four items.
 
         pushToStream(4, "X", true);
-        processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)", "2:X2,FKey2+Y2 (ts: 2)", "3:X3,FKey3+Y3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+                new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
+                new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
 
         // push two items with null to the globalTable as deletes. this should not produce any item.
 
@@ -199,7 +214,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four items.
 
         pushToStream(4, "XX", true);
-        processor.checkAndClearProcessResult("0:XX0,FKey0+null (ts: 0)", "1:XX1,FKey1+null (ts: 1)", "2:XX2,FKey2+Y2 (ts: 2)", "3:XX3,FKey3+Y3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XX0,FKey0+null", 0),
+                new KeyValueTimestamp<>(1, "XX1,FKey1+null", 1),
+                new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 2),
+                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
     }
 
     @Test
@@ -214,7 +232,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // this should produce four items.
 
         pushToStream(4, "XXX", false);
-        processor.checkAndClearProcessResult("0:XXX0+null (ts: 0)", "1:XXX1+null (ts: 1)", "2:XXX2+null (ts: 2)", "3:XXX3+null (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XXX0+null", 0),
+                new KeyValueTimestamp<>(1, "XXX1+null", 1),
+                new KeyValueTimestamp<>(2, "XXX2+null", 2),
+                new KeyValueTimestamp<>(3, "XXX3+null", 3));
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 0508d7c..358be9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
@@ -290,7 +291,7 @@ public class KStreamImplTest {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(input, "a", "b"));
         }
-        assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("a:b (ts: 0)")));
+        assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
     }
 
     @Test
@@ -304,7 +305,7 @@ public class KStreamImplTest {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(input, "e", "f"));
         }
-        assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f (ts: 0)")));
+        assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("e", "f", 0))));
     }
 
     @Test
@@ -323,8 +324,9 @@ public class KStreamImplTest {
             driver.pipeInput(recordFactory.create(input, "b", "v1"));
         }
         final List<MockProcessor<String, String>> mockProcessors = processorSupplier.capturedProcessors(2);
-        assertThat(mockProcessors.get(0).processed, equalTo(asList("a:v1 (ts: 0)", "a:v2 (ts: 0)")));
-        assertThat(mockProcessors.get(1).processed, equalTo(Collections.singletonList("b:v1 (ts: 0)")));
+        assertThat(mockProcessors.get(0).processed, equalTo(asList(new KeyValueTimestamp<>("a", "v1", 0),
+                new KeyValueTimestamp<>("a", "v2", 0))));
+        assertThat(mockProcessors.get(1).processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0))));
     }
 
     @SuppressWarnings("deprecation") // specifically testing the deprecated variant
@@ -676,7 +678,10 @@ public class KStreamImplTest {
             driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
         }
 
-        assertEquals(asList("A:aa (ts: 0)", "B:bb (ts: 0)", "C:cc (ts: 0)", "D:dd (ts: 0)"), processorSupplier.theCapturedProcessor().processed);
+        assertEquals(asList(new KeyValueTimestamp<>("A", "aa", 0),
+                new KeyValueTimestamp<>("B", "bb", 0),
+                new KeyValueTimestamp<>("C", "cc", 0),
+                new KeyValueTimestamp<>("D", "dd", 0)), processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -705,7 +710,14 @@ public class KStreamImplTest {
             driver.pipeInput(recordFactory.create(topic1, "H", "hh", 6L));
         }
 
-        assertEquals(asList("A:aa (ts: 1)", "B:bb (ts: 9)", "C:cc (ts: 2)", "D:dd (ts: 8)", "E:ee (ts: 3)", "F:ff (ts: 7)", "G:gg (ts: 4)", "H:hh (ts: 6)"),
+        assertEquals(asList(new KeyValueTimestamp<>("A", "aa", 1),
+                new KeyValueTimestamp<>("B", "bb", 9),
+                new KeyValueTimestamp<>("C", "cc", 2),
+                new KeyValueTimestamp<>("D", "dd", 8),
+                new KeyValueTimestamp<>("E", "ee", 3),
+                new KeyValueTimestamp<>("F", "ff", 7),
+                new KeyValueTimestamp<>("G", "gg", 4),
+                new KeyValueTimestamp<>("H", "hh", 6)),
                      processorSupplier.theCapturedProcessor().processed);
     }
 
@@ -723,7 +735,11 @@ public class KStreamImplTest {
             driver.pipeInput(recordFactory.create("topic-7", "E", "ee", 3L));
         }
 
-        assertEquals(asList("A:aa (ts: 1)", "B:bb (ts: 5)", "C:cc (ts: 10)", "D:dd (ts: 8)", "E:ee (ts: 3)"),
+        assertEquals(asList(new KeyValueTimestamp<>("A", "aa", 1),
+                new KeyValueTimestamp<>("B", "bb", 5),
+                new KeyValueTimestamp<>("C", "cc", 10),
+                new KeyValueTimestamp<>("D", "dd", 8),
+                new KeyValueTimestamp<>("E", "ee", 3)),
                 processorSupplier.theCapturedProcessor().processed);
     }
 
@@ -746,7 +762,11 @@ public class KStreamImplTest {
             driver.pipeInput(recordFactory.create(topic3, "E", "ee", 3L));
         }
 
-        assertEquals(asList("A:aa (ts: 1)", "B:bb (ts: 5)", "C:cc (ts: 10)", "D:dd (ts: 8)", "E:ee (ts: 3)"),
+        assertEquals(asList(new KeyValueTimestamp<>("A", "aa", 1),
+                new KeyValueTimestamp<>("B", "bb", 5),
+                new KeyValueTimestamp<>("C", "cc", 10),
+                new KeyValueTimestamp<>("D", "dd", 8),
+                new KeyValueTimestamp<>("E", "ee", 3)),
                 processorSupplier.theCapturedProcessor().processed);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 18d601a..6925eba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
@@ -47,7 +48,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamKStreamJoinTest {
-    private final static String[] EMPTY = new String[0];
+    private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
@@ -87,7 +88,7 @@ public class KStreamKStreamJoinTest {
     public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
@@ -129,7 +130,8 @@ public class KStreamKStreamJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
+                new KeyValueTimestamp<>(1, "A1+a1", 0));
 
             // push all four items to the primary stream; this should produce two items
             // w1 = { 0:A0, 1:A1 }
@@ -139,7 +141,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "B" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+a0", 0),
+                new KeyValueTimestamp<>(1, "B1+a1", 0));
 
             // push all items to the other stream; this should produce six items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
@@ -149,7 +152,12 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)", "3:B3+b3 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 0),
+                new KeyValueTimestamp<>(0, "B0+b0", 0),
+                new KeyValueTimestamp<>(1, "A1+b1", 0),
+                new KeyValueTimestamp<>(1, "B1+b1", 0),
+                new KeyValueTimestamp<>(2, "B2+b2", 0),
+                new KeyValueTimestamp<>(3, "B3+b3", 0));
 
             // push all four items to the primary stream; this should produce six items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
@@ -159,7 +167,12 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+a0", 0),
+                new KeyValueTimestamp<>(0, "C0+b0", 0),
+                new KeyValueTimestamp<>(1, "C1+a1", 0),
+                new KeyValueTimestamp<>(1, "C1+b1", 0),
+                new KeyValueTimestamp<>(2, "C2+b2", 0),
+                new KeyValueTimestamp<>(3, "C3+b3", 0));
 
             // push two items to the other stream; this should produce six items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3, 0:C0, 1:C1, 2:C2, 3:C3 }
@@ -169,7 +182,12 @@ public class KStreamKStreamJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "c" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:A0+c0 (ts: 0)", "0:B0+c0 (ts: 0)", "0:C0+c0 (ts: 0)", "1:A1+c1 (ts: 0)", "1:B1+c1 (ts: 0)", "1:C1+c1 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+c0", 0),
+                new KeyValueTimestamp<>(0, "B0+c0", 0),
+                new KeyValueTimestamp<>(0, "C0+c0", 0),
+                new KeyValueTimestamp<>(1, "A1+c1", 0),
+                new KeyValueTimestamp<>(1, "B1+c1", 0),
+                new KeyValueTimestamp<>(1, "C1+c1", 0));
         }
     }
 
@@ -177,7 +195,7 @@ public class KStreamKStreamJoinTest {
     public void testOuterJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
@@ -209,7 +227,8 @@ public class KStreamKStreamJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 0),
+                new KeyValueTimestamp<>(1, "A1+null", 0));
 
             // push two items to the other stream; this should produce two items
             // w1 = { 0:A0, 1:A1 }
@@ -219,7 +238,8 @@ public class KStreamKStreamJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
+                new KeyValueTimestamp<>(1, "A1+a1", 0));
 
             // push all four items to the primary stream; this should produce four items
             // w1 = { 0:A0, 1:A1 }
@@ -229,7 +249,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "B" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)", "3:B3+null (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+a0", 0),
+                new KeyValueTimestamp<>(1, "B1+a1", 0),
+                new KeyValueTimestamp<>(2, "B2+null", 0),
+                new KeyValueTimestamp<>(3, "B3+null", 0));
 
             // push all items to the other stream; this should produce six items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
@@ -239,7 +262,12 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)", "3:B3+b3 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 0),
+                new KeyValueTimestamp<>(0, "B0+b0", 0),
+                new KeyValueTimestamp<>(1, "A1+b1", 0),
+                new KeyValueTimestamp<>(1, "B1+b1", 0),
+                new KeyValueTimestamp<>(2, "B2+b2", 0),
+                new KeyValueTimestamp<>(3, "B3+b3", 0));
 
             // push all four items to the primary stream; this should produce six items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
@@ -249,7 +277,12 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+a0", 0),
+                new KeyValueTimestamp<>(0, "C0+b0", 0),
+                new KeyValueTimestamp<>(1, "C1+a1", 0),
+                new KeyValueTimestamp<>(1, "C1+b1", 0),
+                new KeyValueTimestamp<>(2, "C2+b2", 0),
+                new KeyValueTimestamp<>(3, "C3+b3", 0));
 
             // push two items to the other stream; this should produce six items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3, 0:C0, 1:C1, 2:C2, 3:C3 }
@@ -259,7 +292,12 @@ public class KStreamKStreamJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "c" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:A0+c0 (ts: 0)", "0:B0+c0 (ts: 0)", "0:C0+c0 (ts: 0)", "1:A1+c1 (ts: 0)", "1:B1+c1 (ts: 0)", "1:C1+c1 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+c0", 0),
+                new KeyValueTimestamp<>(0, "B0+c0", 0),
+                new KeyValueTimestamp<>(0, "C0+c0", 0),
+                new KeyValueTimestamp<>(1, "A1+c1", 0),
+                new KeyValueTimestamp<>(1, "B1+c1", 0),
+                new KeyValueTimestamp<>(1, "C1+c1", 0));
         }
     }
 
@@ -267,7 +305,7 @@ public class KStreamKStreamJoinTest {
     public void testWindowing() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
@@ -311,7 +349,8 @@ public class KStreamKStreamJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i], time));
             }
-            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
+                new KeyValueTimestamp<>(1, "A1+a1", 0));
 
             // push four items to the primary stream with larger and increasing timestamp; this should produce no items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -337,7 +376,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+b0", 1100),
+                new KeyValueTimestamp<>(1, "B1+b1", 1100),
+                new KeyValueTimestamp<>(2, "B2+b2", 1100),
+                new KeyValueTimestamp<>(3, "B3+b3", 1100));
 
             // push four items to the other stream with incremented timestamp; this should produce three items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -353,7 +395,9 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "c" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("1:B1+c1 (ts: 1101)", "2:B2+c2 (ts: 1101)", "3:B3+c3 (ts: 1101)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "B1+c1", 1101),
+                new KeyValueTimestamp<>(2, "B2+c2", 1101),
+                new KeyValueTimestamp<>(3, "B3+c3", 1101));
 
             // push four items to the other stream with incremented timestamp; this should produce two items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -371,7 +415,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "d" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("2:B2+d2 (ts: 1102)", "3:B3+d3 (ts: 1102)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "B2+d2", 1102),
+                new KeyValueTimestamp<>(3, "B3+d3", 1102));
 
             // push four items to the other stream with incremented timestamp; this should produce one item
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -391,7 +436,7 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "e" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("3:B3+e3 (ts: 1103)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "B3+e3", 1103));
 
             // push four items to the other stream with incremented timestamp; this should produce no items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -463,7 +508,7 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "h" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:B0+h0 (ts: 1000)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+h0", 1000));
 
             // push four items to the other stream with with incremented timestamp; this should produce two items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -491,7 +536,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "i" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:B0+i0 (ts: 1000)", "1:B1+i1 (ts: 1001)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+i0", 1000),
+                new KeyValueTimestamp<>(1, "B1+i1", 1001));
 
             // push four items to the other stream with with incremented timestamp; this should produce three items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -521,7 +567,9 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "j" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:B0+j0 (ts: 1000)", "1:B1+j1 (ts: 1001)", "2:B2+j2 (ts: 1002)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+j0", 1000),
+                new KeyValueTimestamp<>(1, "B1+j1", 1001),
+                new KeyValueTimestamp<>(2, "B2+j2", 1002));
 
             // push four items to the other stream with with incremented timestamp; this should produce four items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -553,7 +601,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "k" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:B0+k0 (ts: 1000)", "1:B1+k1 (ts: 1001)", "2:B2+k2 (ts: 1002)", "3:B3+k3 (ts: 1003)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+k0", 1000),
+                new KeyValueTimestamp<>(1, "B1+k1", 1001),
+                new KeyValueTimestamp<>(2, "B2+k2", 1002),
+                new KeyValueTimestamp<>(3, "B3+k3", 1003));
 
             // advance time to not join with existing data
             // we omit above exiting data, even if it's still in the window
@@ -578,7 +629,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:C0+l0 (ts: 2100)", "1:C1+l1 (ts: 2100)", "2:C2+l2 (ts: 2100)", "3:C3+l3 (ts: 2100)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+l0", 2100),
+                new KeyValueTimestamp<>(1, "C1+l1", 2100),
+                new KeyValueTimestamp<>(2, "C2+l2", 2100),
+                new KeyValueTimestamp<>(3, "C3+l3", 2100));
 
             // push four items with increase timestamps to the primary stream; this should produce three items
             // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100) }
@@ -590,7 +644,9 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "D" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("1:D1+l1 (ts: 2101)", "2:D2+l2 (ts: 2101)", "3:D3+l3 (ts: 2101)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "D1+l1", 2101),
+                new KeyValueTimestamp<>(2, "D2+l2", 2101),
+                new KeyValueTimestamp<>(3, "D3+l3", 2101));
 
             // push four items with increase timestamps to the primary stream; this should produce two items
             // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
@@ -604,7 +660,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "E" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("2:E2+l2 (ts: 2102)", "3:E3+l3 (ts: 2102)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "E2+l2", 2102),
+                new KeyValueTimestamp<>(3, "E3+l3", 2102));
 
             // push four items with increase timestamps to the primary stream; this should produce one item
             // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
@@ -620,7 +677,7 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "F" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("3:F3+l3 (ts: 2103)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "F3+l3", 2103));
 
             // push four items with increase timestamps (now out of window) to the primary stream; this should produce no items
             // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
@@ -680,7 +737,7 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "I" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:I0+l0 (ts: 2000)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "I0+l0", 2000));
 
             // push four items with increased timestamps to the primary stream; this should produce two items
             // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
@@ -704,7 +761,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "J" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:J0+l0 (ts: 2000)", "1:J1+l1 (ts: 2001)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "J0+l0", 2000),
+                new KeyValueTimestamp<>(1, "J1+l1", 2001));
 
             // push four items with increased timestamps to the primary stream; this should produce three items
             // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
@@ -730,7 +788,9 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "K" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:K0+l0 (ts: 2000)", "1:K1+l1 (ts: 2001)", "2:K2+l2 (ts: 2002)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "K0+l0", 2000),
+                new KeyValueTimestamp<>(1, "K1+l1", 2001),
+                new KeyValueTimestamp<>(2, "K2+l2", 2002));
 
             // push four items with increased timestamps to the primary stream; this should produce four items
             // w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
@@ -758,7 +818,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "L" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:L0+l0 (ts: 2000)", "1:L1+l1 (ts: 2001)", "2:L2+l2 (ts: 2002)", "3:L3+l3 (ts: 2003)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "L0+l0", 2000),
+                new KeyValueTimestamp<>(1, "L1+l1", 2001),
+                new KeyValueTimestamp<>(2, "L2+l2", 2002),
+                new KeyValueTimestamp<>(3, "L3+l3", 2003));
         }
     }
 
@@ -766,7 +829,7 @@ public class KStreamKStreamJoinTest {
     public void testAsymmetricWindowingAfter() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
@@ -825,7 +888,7 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+b0 (ts: 1000)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 1000));
 
             // push four items with increased timestamps to the secondary stream; this should produce two items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -839,7 +902,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "c" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+c0 (ts: 1001)", "1:A1+c1 (ts: 1001)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+c0", 1001),
+                new KeyValueTimestamp<>(1, "A1+c1", 1001));
 
             // push four items with increased timestamps to the secondary stream; this should produce three items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -855,7 +919,9 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "d" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+d0 (ts: 1002)", "1:A1+d1 (ts: 1002)", "2:A2+d2 (ts: 1002)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+d0", 1002),
+                new KeyValueTimestamp<>(1, "A1+d1", 1002),
+                new KeyValueTimestamp<>(2, "A2+d2", 1002));
 
             // push four items with increased timestamps to the secondary stream; this should produce four items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -873,7 +939,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "e" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+e0 (ts: 1003)", "1:A1+e1 (ts: 1003)", "2:A2+e2 (ts: 1003)", "3:A3+e3 (ts: 1003)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+e0", 1003),
+                new KeyValueTimestamp<>(1, "A1+e1", 1003),
+                new KeyValueTimestamp<>(2, "A2+e2", 1003),
+                new KeyValueTimestamp<>(3, "A3+e3", 1003));
 
             // push four items with larger timestamps to the secondary stream; this should produce four items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -893,7 +962,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "f" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+f0 (ts: 1100)", "1:A1+f1 (ts: 1100)", "2:A2+f2 (ts: 1100)", "3:A3+f3 (ts: 1100)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+f0", 1100),
+                new KeyValueTimestamp<>(1, "A1+f1", 1100),
+                new KeyValueTimestamp<>(2, "A2+f2", 1100),
+                new KeyValueTimestamp<>(3, "A3+f3", 1100));
 
             // push four items with increased timestamps to the secondary stream; this should produce three items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -915,7 +987,9 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "g" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("1:A1+g1 (ts: 1101)", "2:A2+g2 (ts: 1101)", "3:A3+g3 (ts: 1101)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "A1+g1", 1101),
+                new KeyValueTimestamp<>(2, "A2+g2", 1101),
+                new KeyValueTimestamp<>(3, "A3+g3", 1101));
 
             // push four items with increased timestamps to the secondary stream; this should produce two items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -939,7 +1013,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "h" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("2:A2+h2 (ts: 1102)", "3:A3+h3 (ts: 1102)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "A2+h2", 1102),
+                new KeyValueTimestamp<>(3, "A3+h3", 1102));
 
             // push four items with increased timestamps to the secondary stream; this should produce one item
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -965,7 +1040,7 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "i" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("3:A3+i3 (ts: 1103)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "A3+i3", 1103));
 
             // push four items with increased timestamps (no out of window) to the secondary stream; this should produce no items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1001,7 +1076,7 @@ public class KStreamKStreamJoinTest {
     public void testAsymmetricWindowingBefore() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
@@ -1059,7 +1134,7 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+b0 (ts: 1000)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 1000));
 
             // push four items with increased timestamp to the other stream; this should produce two items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1073,7 +1148,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "c" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+c0 (ts: 1000)", "1:A1+c1 (ts: 1001)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+c0", 1000),
+                new KeyValueTimestamp<>(1, "A1+c1", 1001));
 
             // push four items with increased timestamp to the other stream; this should produce three items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1089,7 +1165,9 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "d" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+d0 (ts: 1000)", "1:A1+d1 (ts: 1001)", "2:A2+d2 (ts: 1002)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+d0", 1000),
+                new KeyValueTimestamp<>(1, "A1+d1", 1001),
+                new KeyValueTimestamp<>(2, "A2+d2", 1002));
 
             // push four items with increased timestamp to the other stream; this should produce four items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1107,7 +1185,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "e" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+e0 (ts: 1000)", "1:A1+e1 (ts: 1001)", "2:A2+e2 (ts: 1002)", "3:A3+e3 (ts: 1003)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+e0", 1000),
+                new KeyValueTimestamp<>(1, "A1+e1", 1001),
+                new KeyValueTimestamp<>(2, "A2+e2", 1002),
+                new KeyValueTimestamp<>(3, "A3+e3", 1003));
 
             // push four items with larger timestamp to the other stream; this should produce four items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1127,7 +1208,10 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "f" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+f0 (ts: 1000)", "1:A1+f1 (ts: 1001)", "2:A2+f2 (ts: 1002)", "3:A3+f3 (ts: 1003)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+f0", 1000),
+                new KeyValueTimestamp<>(1, "A1+f1", 1001),
+                new KeyValueTimestamp<>(2, "A2+f2", 1002),
+                new KeyValueTimestamp<>(3, "A3+f3", 1003));
 
             // push four items with increase timestamp to the other stream; this should produce three items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1149,7 +1233,9 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "g" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("1:A1+g1 (ts: 1001)", "2:A2+g2 (ts: 1002)", "3:A3+g3 (ts: 1003)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "A1+g1", 1001),
+                new KeyValueTimestamp<>(2, "A2+g2", 1002),
+                new KeyValueTimestamp<>(3, "A3+g3", 1003));
 
             // push four items with increase timestamp to the other stream; this should produce two items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1173,7 +1259,8 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "h" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("2:A2+h2 (ts: 1002)", "3:A3+h3 (ts: 1003)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "A2+h2", 1002),
+                new KeyValueTimestamp<>(3, "A3+h3", 1003));
 
             // push four items with increase timestamp to the other stream; this should produce one item
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1199,7 +1286,7 @@ public class KStreamKStreamJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "i" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("3:A3+i3 (ts: 1003)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "A3+i3", 1003));
 
             // push four items with increase timestamp (no out of window) to the other stream; this should produce no items
             // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index af8ec8f..8c29a14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -43,7 +44,7 @@ import static java.time.Duration.ofMillis;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamKStreamLeftJoinTest {
-    private final static String[] EMPTY = new String[0];
+    private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
@@ -56,7 +57,7 @@ public class KStreamKStreamLeftJoinTest {
     public void testLeftJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
@@ -89,8 +90,8 @@ public class KStreamKStreamLeftJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
-
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 0),
+                new KeyValueTimestamp<>(1, "A1+null", 0));
             // push two items to the other stream; this should produce two items
             // w1 = { 0:A0, 1:A1 }
             // w2 {}
@@ -99,8 +100,8 @@ public class KStreamKStreamLeftJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
-
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
+                new KeyValueTimestamp<>(1, "A1+a1", 0));
             // push three items to the primary stream; this should produce four items
             // w1 = { 0:A0, 1:A1 }
             // w2 = { 0:a0, 1:a1 }
@@ -109,8 +110,9 @@ public class KStreamKStreamLeftJoinTest {
             for (int i = 0; i < 3; i++) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "B" + expectedKeys[i]));
             }
-            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)");
-
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+a0", 0),
+                new KeyValueTimestamp<>(1, "B1+a1", 0),
+                new KeyValueTimestamp<>(2, "B2+null", 0));
             // push all items to the other stream; this should produce five items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
             // w2 = { 0:a0, 1:a1 }
@@ -119,8 +121,11 @@ public class KStreamKStreamLeftJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)");
-
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 0),
+                new KeyValueTimestamp<>(0, "B0+b0", 0),
+                new KeyValueTimestamp<>(1, "A1+b1", 0),
+                new KeyValueTimestamp<>(1, "B1+b1", 0),
+                new KeyValueTimestamp<>(2, "B2+b2", 0));
             // push all four items to the primary stream; this should produce six items
             // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
             // w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
@@ -129,14 +134,19 @@ public class KStreamKStreamLeftJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey));
             }
-            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+a0", 0),
+                new KeyValueTimestamp<>(0, "C0+b0", 0),
+                new KeyValueTimestamp<>(1, "C1+a1", 0),
+                new KeyValueTimestamp<>(1, "C1+b1", 0),
+                new KeyValueTimestamp<>(2, "C2+b2", 0),
+                new KeyValueTimestamp<>(3, "C3+b3", 0));
         }
     }
 
     @Test
     public void testWindowing() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
@@ -170,8 +180,8 @@ public class KStreamKStreamLeftJoinTest {
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time));
             }
-            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
-
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 0),
+                new KeyValueTimestamp<>(1, "A1+null", 0));
             // push four items to the other stream; this should produce two full-join items
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // w2 = {}
@@ -180,8 +190,8 @@ public class KStreamKStreamLeftJoinTest {
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "a" + expectedKey, time));
             }
-            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
-
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
+                new KeyValueTimestamp<>(1, "A1+a1", 0));
             testUpperWindowBound(expectedKeys, driver, processor);
             testLowerWindowBound(expectedKeys, driver, processor);
         }
@@ -216,8 +226,10 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "B" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)");
-
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+b0", 1100),
+            new KeyValueTimestamp<>(1, "B1+b1", 1100),
+            new KeyValueTimestamp<>(2, "B2+b2", 1100),
+            new KeyValueTimestamp<>(3, "B3+b3", 1100));
         // push four items with increased timestamp to the primary stream; this should produce one left-join and three full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) }
@@ -232,8 +244,10 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:C0+null (ts: 1101)", "1:C1+b1 (ts: 1101)", "2:C2+b2 (ts: 1101)", "3:C3+b3 (ts: 1101)");
-
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+null", 1101),
+            new KeyValueTimestamp<>(1, "C1+b1", 1101),
+            new KeyValueTimestamp<>(2, "C2+b2", 1101),
+            new KeyValueTimestamp<>(3, "C3+b3", 1101));
         // push four items with increased timestamp to the primary stream; this should produce two left-join and two full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
@@ -250,8 +264,10 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "D" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:D0+null (ts: 1102)", "1:D1+null (ts: 1102)", "2:D2+b2 (ts: 1102)", "3:D3+b3 (ts: 1102)");
-
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "D0+null", 1102),
+            new KeyValueTimestamp<>(1, "D1+null", 1102),
+            new KeyValueTimestamp<>(2, "D2+b2", 1102),
+            new KeyValueTimestamp<>(3, "D3+b3", 1102));
         // push four items with increased timestamp to the primary stream; this should produce three left-join and one full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
@@ -270,8 +286,10 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "E" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:E0+null (ts: 1103)", "1:E1+null (ts: 1103)", "2:E2+null (ts: 1103)", "3:E3+b3 (ts: 1103)");
-
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "E0+null", 1103),
+            new KeyValueTimestamp<>(1, "E1+null", 1103),
+            new KeyValueTimestamp<>(2, "E2+null", 1103),
+            new KeyValueTimestamp<>(3, "E3+b3", 1103));
         // push four items with increased timestamp to the primary stream; this should produce four left-join and no full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
@@ -292,14 +310,16 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "F" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:F0+null (ts: 1104)", "1:F1+null (ts: 1104)", "2:F2+null (ts: 1104)", "3:F3+null (ts: 1104)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "F0+null", 1104),
+            new KeyValueTimestamp<>(1, "F1+null", 1104),
+            new KeyValueTimestamp<>(2, "F2+null", 1104),
+            new KeyValueTimestamp<>(3, "F3+null", 1104));
     }
 
     private void testLowerWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
                                       final MockProcessor<Integer, String> processor) {
         long time;
-
         // push four items with smaller timestamp (before the window) to the primary stream; this should produce four left-join and no full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
@@ -322,8 +342,9 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "G" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:G0+null (ts: 899)", "1:G1+null (ts: 899)", "2:G2+null (ts: 899)", "3:G3+null (ts: 899)");
-
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "G0+null", 899),
+            new KeyValueTimestamp<>(1, "G1+null", 899), new KeyValueTimestamp<>(2, "G2+null", 899),
+            new KeyValueTimestamp<>(3, "G3+null", 899));
         // push four items with increase timestamp to the primary stream; this should produce three left-join and one full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
@@ -348,8 +369,9 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "H" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:H0+b0 (ts: 1000)", "1:H1+null (ts: 900)", "2:H2+null (ts: 900)", "3:H3+null (ts: 900)");
-
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "H0+b0", 1000),
+            new KeyValueTimestamp<>(1, "H1+null", 900), new KeyValueTimestamp<>(2, "H2+null", 900),
+            new KeyValueTimestamp<>(3, "H3+null", 900));
         // push four items with increase timestamp to the primary stream; this should produce two left-join and two full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
@@ -376,8 +398,9 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "I" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:I0+b0 (ts: 1000)", "1:I1+b1 (ts: 1001)", "2:I2+null (ts: 901)", "3:I3+null (ts: 901)");
-
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "I0+b0", 1000),
+            new KeyValueTimestamp<>(1, "I1+b1", 1001), new KeyValueTimestamp<>(2, "I2+null", 901),
+            new KeyValueTimestamp<>(3, "I3+null", 901));
         // push four items with increase timestamp to the primary stream; this should produce one left-join and three full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
@@ -406,8 +429,9 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "J" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:J0+b0 (ts: 1000)", "1:J1+b1 (ts: 1001)", "2:J2+b2 (ts: 1002)", "3:J3+null (ts: 902)");
-
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "J0+b0", 1000),
+            new KeyValueTimestamp<>(1, "J1+b1", 1001), new KeyValueTimestamp<>(2, "J2+b2", 1002),
+            new KeyValueTimestamp<>(3, "J3+null", 902));
         // push four items with increase timestamp to the primary stream; this should produce one left-join and three full-join items
         // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
         //        0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
@@ -438,6 +462,8 @@ public class KStreamKStreamLeftJoinTest {
         for (final int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topic1, expectedKey, "K" + expectedKey, time));
         }
-        processor.checkAndClearProcessResult("0:K0+b0 (ts: 1000)", "1:K1+b1 (ts: 1001)", "2:K2+b2 (ts: 1002)", "3:K3+b3 (ts: 1003)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "K0+b0", 1000),
+            new KeyValueTimestamp<>(1, "K1+b1", 1001), new KeyValueTimestamp<>(2, "K2+b2", 1002),
+            new KeyValueTimestamp<>(3, "K3+b3", 1003));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index e8d962a..78f326c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
@@ -48,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamKTableJoinTest {
-    private final static String[] EMPTY = new String[0];
+    private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
     private final String streamTopic = "streamTopic";
     private final String tableTopic = "tableTopic";
@@ -135,7 +136,8 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1+Y1", 1));
 
         // push all items to the table. this should not produce any item
         pushToTable(4, "YY");
@@ -143,7 +145,10 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 1)", "2:X2+YY2 (ts: 2)", "3:X3+YY3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+YY0", 0),
+                new KeyValueTimestamp<>(1, "X1+YY1", 1),
+                new KeyValueTimestamp<>(2, "X2+YY2", 2),
+                new KeyValueTimestamp<>(3, "X3+YY3", 3));
 
         // push all items to the table. this should not produce any item
         pushToTable(4, "YYY");
@@ -158,7 +163,8 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1+Y1", 1));
 
     }
 
@@ -170,7 +176,10 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)", "2:X2+Y2 (ts: 2)", "3:X3+Y3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1+Y1", 1),
+                new KeyValueTimestamp<>(2, "X2+Y2", 2),
+                new KeyValueTimestamp<>(3, "X3+Y3", 3));
 
         // push two items with null to the table as deletes. this should not produce any item.
         pushNullValueToTable();
@@ -178,7 +187,8 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
         pushToStream(4, "XX");
-        processor.checkAndClearProcessResult("2:XX2+Y2 (ts: 2)", "3:XX3+Y3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "XX2+Y2", 2),
+                new KeyValueTimestamp<>(3, "XX3+Y3", 3));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index b83e53c..d9ad07d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -44,7 +45,7 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamKTableLeftJoinTest {
-    private final static String[] EMPTY = new String[0];
+    private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
     private final String streamTopic = "streamTopic";
     private final String tableTopic = "tableTopic";
@@ -116,14 +117,16 @@ public class KStreamKTableLeftJoinTest {
     public void shouldJoinWithEmptyTableOnStreamUpdates() {
         // push two items to the primary stream. the table is empty
         pushToStream(2, "X");
-        processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 1)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+null", 0),
+                new KeyValueTimestamp<>(1, "X1+null", 1));
     }
 
     @Test
     public void shouldNotJoinOnTableUpdates() {
         // push two items to the primary stream. the table is empty
         pushToStream(2, "X");
-        processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 1)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+null", 0),
+                new KeyValueTimestamp<>(1, "X1+null", 1));
 
         // push two items to the table. this should not produce any item.
         pushToTable(2, "Y");
@@ -131,7 +134,10 @@ public class KStreamKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)", "2:X2+null (ts: 2)", "3:X3+null (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1+Y1", 1),
+                new KeyValueTimestamp<>(2, "X2+null", 2),
+                new KeyValueTimestamp<>(3, "X3+null", 3));
 
         // push all items to the table. this should not produce any item
         pushToTable(4, "YY");
@@ -139,7 +145,10 @@ public class KStreamKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 1)", "2:X2+YY2 (ts: 2)", "3:X3+YY3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+YY0", 0),
+                new KeyValueTimestamp<>(1, "X1+YY1", 1),
+                new KeyValueTimestamp<>(2, "X2+YY2", 2),
+                new KeyValueTimestamp<>(3, "X3+YY3", 3));
 
         // push all items to the table. this should not produce any item
         pushToTable(4, "YYY");
@@ -154,7 +163,10 @@ public class KStreamKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)", "2:X2+null (ts: 2)", "3:X3+null (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1+Y1", 1),
+                new KeyValueTimestamp<>(2, "X2+null", 2),
+                new KeyValueTimestamp<>(3, "X3+null", 3));
 
     }
 
@@ -166,7 +178,10 @@ public class KStreamKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)", "2:X2+Y2 (ts: 2)", "3:X3+Y3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
+                new KeyValueTimestamp<>(1, "X1+Y1", 1),
+                new KeyValueTimestamp<>(2, "X2+Y2", 2),
+                new KeyValueTimestamp<>(3, "X3+Y3", 3));
 
         // push two items with null to the table as deletes. this should not produce any item.
         pushNullValueToTable(2);
@@ -174,7 +189,10 @@ public class KStreamKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
         pushToStream(4, "XX");
-        processor.checkAndClearProcessResult("0:XX0+null (ts: 0)", "1:XX1+null (ts: 1)", "2:XX2+Y2 (ts: 2)", "3:XX3+Y3 (ts: 3)");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XX0+null", 0),
+                new KeyValueTimestamp<>(1, "XX1+null", 1),
+                new KeyValueTimestamp<>(2, "XX2+Y2", 2),
+                new KeyValueTimestamp<>(3, "XX3+Y3", 3));
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 3a346d3..8020e6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -42,7 +43,7 @@ public class KStreamMapTest {
     public void testMap() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topicName = "topic";
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
         final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
@@ -54,7 +55,10 @@ public class KStreamMapTest {
             }
         }
 
-        final String[] expected = new String[]{"V0:0 (ts: 10)", "V1:1 (ts: 9)", "V2:2 (ts: 8)", "V3:3 (ts: 7)"};
+        final KeyValueTimestamp[] expected = new KeyValueTimestamp[] {new KeyValueTimestamp<>("V0", 0, 10),
+            new KeyValueTimestamp<>("V1", 1, 9),
+            new KeyValueTimestamp<>("V2", 2, 8),
+            new KeyValueTimestamp<>("V3", 3, 7)};
         assertEquals(4, supplier.theCapturedProcessor().processed.size());
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 3813495..d01b63b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -54,7 +55,10 @@ public class KStreamMapValuesTest {
                 driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey), expectedKey / 2L));
             }
         }
-        final String[] expected = {"1:1 (ts: 0)", "10:2 (ts: 5)", "100:3 (ts: 50)", "1000:4 (ts: 500)"};
+        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 1, 0),
+            new KeyValueTimestamp<>(10, 2, 5),
+            new KeyValueTimestamp<>(100, 3, 50),
+            new KeyValueTimestamp<>(1000, 4, 500)};
 
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
@@ -75,7 +79,10 @@ public class KStreamMapValuesTest {
                 driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey), expectedKey / 2L));
             }
         }
-        final String[] expected = {"1:2 (ts: 0)", "10:12 (ts: 5)", "100:103 (ts: 50)", "1000:1004 (ts: 500)"};
+        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 2, 0),
+            new KeyValueTimestamp<>(10, 12, 5),
+            new KeyValueTimestamp<>(100, 103, 50),
+            new KeyValueTimestamp<>(1000, 1004, 500)};
 
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index ea20022..63f79e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -49,7 +50,9 @@ public class KStreamSelectKeyTest {
         keyMap.put(2, "TWO");
         keyMap.put(3, "THREE");
 
-        final String[] expected = new String[]{"ONE:1 (ts: 0)", "TWO:2 (ts: 0)", "THREE:3 (ts: 0)"};
+        final KeyValueTimestamp[] expected = new KeyValueTimestamp[]{new KeyValueTimestamp<>("ONE", 1, 0),
+            new KeyValueTimestamp<>("TWO", 2, 0),
+            new KeyValueTimestamp<>("THREE", 3, 0)};
         final int[] expectedValues = new int[]{1, 2, 3};
 
         final KStream<String, Integer>  stream =
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index fcf6aea..f7ba7bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -71,7 +72,7 @@ public class KStreamTransformTest {
                 }
 
                 @Override
-                public void close() {}
+                public void close() { }
             };
 
         final int[] expectedKeys = {1, 10, 100, 1000};
@@ -97,13 +98,13 @@ public class KStreamTransformTest {
             driver.advanceWallClockTime(2);
             driver.advanceWallClockTime(1);
 
-            final String[] expected = {
-                "2:10 (ts: 0)",
-                "20:110 (ts: 5)",
-                "200:1110 (ts: 50)",
-                "2000:11110 (ts: 500)",
-                "-1:2 (ts: 2)",
-                "-1:3 (ts: 3)"
+            final KeyValueTimestamp[] expected = {
+                new KeyValueTimestamp<>(2, 10, 0),
+                new KeyValueTimestamp<>(20, 110, 5),
+                new KeyValueTimestamp<>(200, 1110, 50),
+                new KeyValueTimestamp<>(2000, 11110, 500),
+                new KeyValueTimestamp<>(-1, 2, 2),
+                new KeyValueTimestamp<>(-1, 3, 3)
             };
 
             assertEquals(expected.length, processor.theCapturedProcessor().processed.size());
@@ -136,7 +137,7 @@ public class KStreamTransformTest {
                 }
 
                 @Override
-                public void close() {}
+                public void close() { }
             };
 
         final int[] expectedKeys = {1, 10, 100, 1000};
@@ -158,7 +159,12 @@ public class KStreamTransformTest {
 
         assertEquals(6, processor.theCapturedProcessor().processed.size());
 
-        final String[] expected = {"2:10 (ts: 0)", "20:110 (ts: 0)", "200:1110 (ts: 0)", "2000:11110 (ts: 0)", "-1:2 (ts: 2)", "-1:3 (ts: 3)"};
+        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(2, 10, 0),
+            new KeyValueTimestamp<>(20, 110, 0),
+            new KeyValueTimestamp<>(200, 1110, 0),
+            new KeyValueTimestamp<>(2000, 11110, 0),
+            new KeyValueTimestamp<>(-1, 2, 2),
+            new KeyValueTimestamp<>(-1, 3, 3)};
 
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index a786166..adb6d35 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -64,7 +65,7 @@ public class KStreamTransformValuesTest {
                 private int total = 0;
 
                 @Override
-                public void init(final ProcessorContext context) {}
+                public void init(final ProcessorContext context) { }
 
                 @Override
                 public Integer transform(final Number value) {
@@ -73,7 +74,7 @@ public class KStreamTransformValuesTest {
                 }
 
                 @Override
-                public void close() {}
+                public void close() { }
             };
 
         final int[] expectedKeys = {1, 10, 100, 1000};
@@ -87,7 +88,10 @@ public class KStreamTransformValuesTest {
                 driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, expectedKey / 2L));
             }
         }
-        final String[] expected = {"1:10 (ts: 0)", "10:110 (ts: 5)", "100:1110 (ts: 50)", "1000:11110 (ts: 500)"};
+        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 10, 0),
+            new KeyValueTimestamp<>(10, 110, 5),
+            new KeyValueTimestamp<>(100, 1110, 50),
+            new KeyValueTimestamp<>(1000, 11110, 500)};
 
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
@@ -101,7 +105,7 @@ public class KStreamTransformValuesTest {
                 private int total = 0;
 
                 @Override
-                public void init(final ProcessorContext context) {}
+                public void init(final ProcessorContext context) { }
 
                 @Override
                 public Integer transform(final Integer readOnlyKey, final Number value) {
@@ -110,7 +114,7 @@ public class KStreamTransformValuesTest {
                 }
 
                 @Override
-                public void close() {}
+                public void close() { }
             };
 
         final int[] expectedKeys = {1, 10, 100, 1000};
@@ -124,7 +128,10 @@ public class KStreamTransformValuesTest {
                 driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, expectedKey / 2L));
             }
         }
-        final String[] expected = {"1:11 (ts: 0)", "10:121 (ts: 5)", "100:1221 (ts: 50)", "1000:12221 (ts: 500)"};
+        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 11, 0),
+            new KeyValueTimestamp<>(10, 121, 5),
+            new KeyValueTimestamp<>(100, 1221, 50),
+            new KeyValueTimestamp<>(1000, 12221, 500)};
 
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 1c7abb6..6139820 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -105,28 +106,35 @@ public class KStreamWindowAggregateTest {
 
         assertEquals(
             asList(
-                "[A@0/10]:0+1 (ts: 0)",
-                "[B@0/10]:0+2 (ts: 1)",
-                "[C@0/10]:0+3 (ts: 2)",
-                "[D@0/10]:0+4 (ts: 3)",
-                "[A@0/10]:0+1+1 (ts: 4)",
-
-                "[A@0/10]:0+1+1+1 (ts: 5)", "[A@5/15]:0+1 (ts: 5)",
-                "[B@0/10]:0+2+2 (ts: 6)", "[B@5/15]:0+2 (ts: 6)",
-                "[D@0/10]:0+4+4 (ts: 7)", "[D@5/15]:0+4 (ts: 7)",
-                "[B@0/10]:0+2+2+2 (ts: 8)", "[B@5/15]:0+2+2 (ts: 8)",
-                "[C@0/10]:0+3+3 (ts: 9)", "[C@5/15]:0+3 (ts: 9)",
-
-                "[A@5/15]:0+1+1 (ts: 10)", "[A@10/20]:0+1 (ts: 10)",
-                "[B@5/15]:0+2+2+2 (ts: 11)", "[B@10/20]:0+2 (ts: 11)",
-                "[D@5/15]:0+4+4 (ts: 12)", "[D@10/20]:0+4 (ts: 12)",
-                "[B@5/15]:0+2+2+2+2 (ts: 13)", "[B@10/20]:0+2+2 (ts: 13)",
-                "[C@5/15]:0+3+3 (ts: 14)", "[C@10/20]:0+3 (ts: 14)",
-
-                "[B@0/10]:0+2+2+2+1 (ts: 8)",
-                "[B@0/10]:0+2+2+2+1+2 (ts: 8)",
-                "[B@0/10]:0+2+2+2+1+2+3 (ts: 9)",
-                "[B@5/15]:0+2+2+2+2+3 (ts: 13)"
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 0),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+2", 1),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+3", 2),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(0, 10)), "0+4", 3),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1+1", 4),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1+1+1",  5),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+1",  5),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2",  6),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+2",  6),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(0, 10)),  "0+4+4",  7),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)),  "0+4",  7),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2+2",  8),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+2+2",  8),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)),  "0+3+3",  9),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)),  "0+3",  9),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+1+1",  10),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)),  "0+1",  10),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+2+2+2",  11),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10, 20)),  "0+2",  11),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)),  "0+4+4",  12),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(10, 20)),  "0+4",  12),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+2+2+2+2",  13),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10, 20)),  "0+2+2",  13),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)),  "0+3+3",  14),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(10, 20)),  "0+3",  14),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2+2+1",  8),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2+2+1+2",  8),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2+2+1+2+3",  9),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+2+2+2+2+3",  13)
 
                 ),
             supplier.theCapturedProcessor().processed
@@ -167,15 +175,15 @@ public class KStreamWindowAggregateTest {
             final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
 
             processors.get(0).checkAndClearProcessResult(
-                "[A@0/10]:0+1 (ts: 0)",
-                "[B@0/10]:0+2 (ts: 1)",
-                "[C@0/10]:0+3 (ts: 2)",
-                "[D@0/10]:0+4 (ts: 3)",
-                "[A@0/10]:0+1+1 (ts: 9)",
-                "[A@5/15]:0+1 (ts: 9)"
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1",  0),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2",  1),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)),  "0+3",  2),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(0, 10)),  "0+4",  3),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1+1",  9),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+1",  9)
             );
-            processors.get(1).checkAndClearProcessResult(new String[0]);
-            processors.get(2).checkAndClearProcessResult(new String[0]);
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
+            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
 
             driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
             driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
@@ -184,14 +192,19 @@ public class KStreamWindowAggregateTest {
             driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
 
             processors.get(0).checkAndClearProcessResult(
-                "[A@0/10]:0+1+1+1 (ts: 9)", "[A@5/15]:0+1+1 (ts: 9)",
-                "[B@0/10]:0+2+2 (ts: 6)", "[B@5/15]:0+2 (ts: 6)",
-                "[D@0/10]:0+4+4 (ts: 7)", "[D@5/15]:0+4 (ts: 7)",
-                "[B@0/10]:0+2+2+2 (ts: 8)", "[B@5/15]:0+2+2 (ts: 8)",
-                "[C@0/10]:0+3+3 (ts: 9)", "[C@5/15]:0+3 (ts: 9)"
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1+1+1",  9),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+1+1",  9),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2",  6),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+2",  6),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(0, 10)),  "0+4+4",  7),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)),  "0+4",  7),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2+2",  8),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+2+2",  8),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)),  "0+3+3",  9),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)),  "0+3",  9)
             );
-            processors.get(1).checkAndClearProcessResult(new String[0]);
-            processors.get(2).checkAndClearProcessResult(new String[0]);
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
+            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
 
             driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
             driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
@@ -199,20 +212,20 @@ public class KStreamWindowAggregateTest {
             driver.pipeInput(recordFactory.create(topic2, "D", "d", 20L));
             driver.pipeInput(recordFactory.create(topic2, "A", "a", 20L));
 
-            processors.get(0).checkAndClearProcessResult(new String[0]);
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
             processors.get(1).checkAndClearProcessResult(
-                "[A@0/10]:0+a (ts: 0)",
-                "[B@0/10]:0+b (ts: 1)",
-                "[C@0/10]:0+c (ts: 2)",
-                "[D@15/25]:0+d (ts: 20)",
-                "[D@20/30]:0+d (ts: 20)",
-                "[A@15/25]:0+a (ts: 20)",
-                "[A@20/30]:0+a (ts: 20)"
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+a",  0),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+b",  1),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)),  "0+c",  2),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(15, 25)),  "0+d",  20),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(20, 30)),  "0+d",  20),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(15, 25)),  "0+a",  20),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(20, 30)),  "0+a",  20)
             );
             processors.get(2).checkAndClearProcessResult(
-                "[A@0/10]:0+1+1+1%0+a (ts: 9)",
-                "[B@0/10]:0+2+2+2%0+b (ts: 8)",
-                "[C@0/10]:0+3+3%0+c (ts: 9)");
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1+1+1%0+a",  9),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2+2%0+b",  8),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)),  "0+3+3%0+c",  9));
 
             driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
             driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
@@ -220,18 +233,26 @@ public class KStreamWindowAggregateTest {
             driver.pipeInput(recordFactory.create(topic2, "D", "d", 18L));
             driver.pipeInput(recordFactory.create(topic2, "A", "a", 21L));
 
-            processors.get(0).checkAndClearProcessResult(new String[0]);
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
             processors.get(1).checkAndClearProcessResult(
-                "[A@0/10]:0+a+a (ts: 5)", "[A@5/15]:0+a (ts: 5)",
-                "[B@0/10]:0+b+b (ts: 6)", "[B@5/15]:0+b (ts: 6)",
-                "[D@0/10]:0+d (ts: 7)", "[D@5/15]:0+d (ts: 7)",
-                "[D@10/20]:0+d (ts: 18)", "[D@15/25]:0+d+d (ts: 20)",
-                "[A@15/25]:0+a+a (ts: 21)", "[A@20/30]:0+a+a (ts: 21)"
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+a+a",  5),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+a",  5),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+b+b",  6),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+b",  6),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(0, 10)),  "0+d",  7),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)),  "0+d",  7),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(10, 20)),  "0+d",  18),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(15, 25)),  "0+d+d",  20),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(15, 25)),  "0+a+a",  21),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(20, 30)),  "0+a+a",  21)
             );
             processors.get(2).checkAndClearProcessResult(
-                "[A@0/10]:0+1+1+1%0+a+a (ts: 9)", "[A@5/15]:0+1+1%0+a (ts: 9)",
-                "[B@0/10]:0+2+2+2%0+b+b (ts: 8)", "[B@5/15]:0+2+2%0+b (ts: 8)",
-                "[D@0/10]:0+4+4%0+d (ts: 7)", "[D@5/15]:0+4%0+d (ts: 7)"
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1+1+1%0+a+a",  9),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+1+1%0+a",  9),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),  "0+2+2+2%0+b+b",  8),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)),  "0+2+2%0+b",  8),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(0, 10)),  "0+4+4%0+d",  7),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)),  "0+4%0+d",  7)
             );
         }
     }
@@ -418,4 +439,4 @@ public class KStreamWindowAggregateTest {
     private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver) {
         return driver.readOutput("output", new StringDeserializer(), new StringDeserializer());
     }
-}
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index b704e13..b2b58a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -92,18 +93,18 @@ public class KTableAggregateTest {
 
             assertEquals(
                 asList(
-                    "A:0+1 (ts: 10)",
-                    "B:0+2 (ts: 15)",
-                    "A:0+1-1 (ts: 20)",
-                    "A:0+1-1+3 (ts: 20)",
-                    "B:0+2-2 (ts: 18)",
-                    "B:0+2-2+4 (ts: 18)",
-                    "C:0+5 (ts: 5)",
-                    "D:0+6 (ts: 25)",
-                    "B:0+2-2+4-4 (ts: 18)",
-                    "B:0+2-2+4-4+7 (ts: 18)",
-                    "C:0+5-5 (ts: 10)",
-                    "C:0+5-5+8 (ts: 10)"),
+                    new KeyValueTimestamp<>("A", "0+1", 10L),
+                    new KeyValueTimestamp<>("B", "0+2", 15L),
+                    new KeyValueTimestamp<>("A", "0+1-1", 20L),
+                    new KeyValueTimestamp<>("A", "0+1-1+3", 20L),
+                    new KeyValueTimestamp<>("B", "0+2-2", 18L),
+                    new KeyValueTimestamp<>("B", "0+2-2+4", 18L),
+                    new KeyValueTimestamp<>("C", "0+5", 5L),
+                    new KeyValueTimestamp<>("D", "0+6", 25L),
+                    new KeyValueTimestamp<>("B", "0+2-2+4-4", 18L),
+                    new KeyValueTimestamp<>("B", "0+2-2+4-4+7", 18L),
+                    new KeyValueTimestamp<>("C", "0+5-5", 10L),
+                    new KeyValueTimestamp<>("C", "0+5-5+8", 10L)),
                 supplier.theCapturedProcessor().processed);
         }
     }
@@ -159,14 +160,14 @@ public class KTableAggregateTest {
 
             assertEquals(
                 asList(
-                    "1:0+1 (ts: 10)",
-                    "1:0+1-1 (ts: 15)",
-                    "1:0+1-1+1 (ts: 15)",
-                    "2:0+2 (ts: 20)",
-                    //noop
-                    "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)",
-                    //noop
-                    "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"),
+                    new KeyValueTimestamp<>("1", "0+1", 10),
+                    new KeyValueTimestamp<>("1", "0+1-1", 15),
+                    new KeyValueTimestamp<>("1", "0+1-1+1", 15),
+                    new KeyValueTimestamp<>("2", "0+2", 20),
+                        new KeyValueTimestamp<>("2", "0+2-2", 23),
+                        new KeyValueTimestamp<>("4", "0+4", 23),
+                        new KeyValueTimestamp<>("4", "0+4-4", 23),
+                        new KeyValueTimestamp<>("7", "0+7", 22)),
                 supplier.theCapturedProcessor().processed);
         }
     }
@@ -194,11 +195,12 @@ public class KTableAggregateTest {
 
             assertEquals(
                 asList(
-                    "green:1 (ts: 10)",
-                    "green:2 (ts: 10)",
-                    "green:1 (ts: 12)", "blue:1 (ts: 12)",
-                    "yellow:1 (ts: 15)",
-                    "green:2 (ts: 12)"),
+                    new KeyValueTimestamp<>("green", 1L, 10),
+                    new KeyValueTimestamp<>("green", 2L, 10),
+                    new KeyValueTimestamp<>("green", 1L, 12),
+                    new KeyValueTimestamp<>("blue", 1L, 12),
+                    new KeyValueTimestamp<>("yellow", 1L, 15),
+                    new KeyValueTimestamp<>("green", 2L, 12)),
                 supplier.theCapturedProcessor().processed);
         }
     }
@@ -277,11 +279,11 @@ public class KTableAggregateTest {
 
             assertEquals(
                 asList(
-                    "1:1 (ts: 10)",
-                    "1:12 (ts: 10)",
-                    "1:2 (ts: 12)",
-                    "1: (ts: 12)",
-                    "1:2 (ts: 12)"
+                    new KeyValueTimestamp<>("1", "1", 10),
+                    new KeyValueTimestamp<>("1", "12", 10),
+                    new KeyValueTimestamp<>("1", "2", 12),
+                    new KeyValueTimestamp<>("1", "", 12),
+                    new KeyValueTimestamp<>("1", "2", 12L)
                 ),
                 proc.processed
             );
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 6996843..2d32878 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
@@ -80,8 +81,18 @@ public class KTableFilterTest {
 
         final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
 
-        processors.get(0).checkAndClearProcessResult("A:null (ts: 10)", "B:2 (ts: 5)", "C:null (ts: 8)", "D:4 (ts: 14)", "A:null (ts: 18)", "B:null (ts: 15)");
-        processors.get(1).checkAndClearProcessResult("A:1 (ts: 10)", "B:null (ts: 5)", "C:3 (ts: 8)", "D:null (ts: 14)", "A:null (ts: 18)", "B:null (ts: 15)");
+        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", null, 10),
+            new KeyValueTimestamp<>("B", 2, 5),
+            new KeyValueTimestamp<>("C", null, 8),
+            new KeyValueTimestamp<>("D", 4, 14),
+            new KeyValueTimestamp<>("A", null, 18),
+            new KeyValueTimestamp<>("B", null, 15));
+        processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", 1, 10),
+            new KeyValueTimestamp<>("B", null, 5),
+            new KeyValueTimestamp<>("C", 3, 8),
+            new KeyValueTimestamp<>("D", null, 14),
+            new KeyValueTimestamp<>("A", null, 18),
+            new KeyValueTimestamp<>("B", null, 15));
     }
 
     @Test
@@ -222,25 +233,32 @@ public class KTableFilterTest {
 
             final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
 
-            processors.get(0).checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
-            processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 5)", "B:(null<-null) (ts: 10)", "C:(null<-null) (ts: 15)");
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
+                new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
+                new KeyValueTimestamp<>("C", new Change<>(1, null), 15));
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, null), 5),
+                new KeyValueTimestamp<>("B", new Change<>(null, null), 10),
+                new KeyValueTimestamp<>("C", new Change<>(null, null), 15));
 
             driver.pipeInput(recordFactory.create(topic1, "A", 2, 15L));
             driver.pipeInput(recordFactory.create(topic1, "B", 2, 8L));
 
-            processors.get(0).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
-            processors.get(1).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, null), 15),
+                new KeyValueTimestamp<>("B", new Change<>(2, null), 8));
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, null), 15),
+                new KeyValueTimestamp<>("B", new Change<>(2, null), 8));
 
             driver.pipeInput(recordFactory.create(topic1, "A", 3, 20L));
 
-            processors.get(0).checkAndClearProcessResult("A:(3<-null) (ts: 20)");
-            processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 20)");
-
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(3, null), 20));
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, null), 20));
             driver.pipeInput(recordFactory.create(topic1, "A", null, 10L));
             driver.pipeInput(recordFactory.create(topic1, "B", null, 20L));
 
-            processors.get(0).checkAndClearProcessResult("A:(null<-null) (ts: 10)", "B:(null<-null) (ts: 20)");
-            processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 10)", "B:(null<-null) (ts: 20)");
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, null), 10),
+                new KeyValueTimestamp<>("B", new Change<>(null, null), 20));
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, null), 10),
+                new KeyValueTimestamp<>("B", new Change<>(null, null), 20));
         }
     }
 
@@ -251,7 +269,7 @@ public class KTableFilterTest {
         final String topic1 = "topic1";
 
         final KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
 
         doTestNotSendingOldValue(builder, table1, table2, topic1);
@@ -289,25 +307,30 @@ public class KTableFilterTest {
 
             final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
 
-            processors.get(0).checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
+                new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
+                new KeyValueTimestamp<>("C", new Change<>(1, null), 15));
             processors.get(1).checkEmptyAndClearProcessResult();
 
             driver.pipeInput(recordFactory.create(topic1, "A", 2, 15L));
             driver.pipeInput(recordFactory.create(topic1, "B", 2, 8L));
 
-            processors.get(0).checkAndClearProcessResult("A:(2<-1) (ts: 15)", "B:(2<-1) (ts: 8)");
-            processors.get(1).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, 1), 15),
+                new KeyValueTimestamp<>("B", new Change<>(2, 1), 8));
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, null), 15),
+                new KeyValueTimestamp<>("B", new Change<>(2, null), 8));
 
             driver.pipeInput(recordFactory.create(topic1, "A", 3, 20L));
 
-            processors.get(0).checkAndClearProcessResult("A:(3<-2) (ts: 20)");
-            processors.get(1).checkAndClearProcessResult("A:(null<-2) (ts: 20)");
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(3, 2), 20));
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 2), 20));
 
             driver.pipeInput(recordFactory.create(topic1, "A", null, 10L));
             driver.pipeInput(recordFactory.create(topic1, "B", null, 20L));
 
-            processors.get(0).checkAndClearProcessResult("A:(null<-3) (ts: 10)", "B:(null<-2) (ts: 20)");
-            processors.get(1).checkAndClearProcessResult("B:(null<-2) (ts: 20)");
+            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 3), 10),
+                new KeyValueTimestamp<>("B", new Change<>(null, 2), 20));
+            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("B", new Change<>(null, 2), 20));
         }
     }
 
@@ -317,7 +340,7 @@ public class KTableFilterTest {
         final String topic1 = "topic1";
 
         final KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         final KTableImpl<String, Integer, Integer> table2 =
             (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
 
@@ -357,7 +380,9 @@ public class KTableFilterTest {
         }
 
         final List<MockProcessor<String, String>> processors = supplier.capturedProcessors(2);
-        processors.get(0).checkAndClearProcessResult("A:(reject<-null) (ts: 5)", "B:(reject<-null) (ts: 10)", "C:(reject<-null) (ts: 20)");
+        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>("reject", null), 5),
+            new KeyValueTimestamp<>("B", new Change<>("reject", null), 10),
+            new KeyValueTimestamp<>("C", new Change<>("reject", null), 20));
         processors.get(1).checkEmptyAndClearProcessResult();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index f245fec..9e8a3b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
@@ -112,10 +113,30 @@ public class KTableImplTest {
         }
 
         final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
-        assertEquals(asList("A:01 (ts: 5)", "B:02 (ts: 100)", "C:03 (ts: 0)", "D:04 (ts: 0)", "A:05 (ts: 10)", "A:06 (ts: 8)"), processors.get(0).processed);
-        assertEquals(asList("A:1 (ts: 5)", "B:2 (ts: 100)", "C:3 (ts: 0)", "D:4 (ts: 0)", "A:5 (ts: 10)", "A:6 (ts: 8)"), processors.get(1).processed);
-        assertEquals(asList("A:null (ts: 5)", "B:2 (ts: 100)", "C:null (ts: 0)", "D:4 (ts: 0)", "A:null (ts: 10)", "A:6 (ts: 8)"), processors.get(2).processed);
-        assertEquals(asList("A:01 (ts: 5)", "B:02 (ts: 100)", "C:03 (ts: 0)", "D:04 (ts: 0)", "A:05 (ts: 10)", "A:06 (ts: 8)"), processors.get(3).processed);
+        assertEquals(asList(new KeyValueTimestamp<>("A", "01", 5),
+                new KeyValueTimestamp<>("B", "02", 100),
+                new KeyValueTimestamp<>("C", "03", 0),
+                new KeyValueTimestamp<>("D", "04", 0),
+                new KeyValueTimestamp<>("A", "05", 10),
+                new KeyValueTimestamp<>("A", "06", 8)), processors.get(0).processed);
+        assertEquals(asList(new KeyValueTimestamp<>("A", 1, 5),
+                new KeyValueTimestamp<>("B", 2, 100),
+                new KeyValueTimestamp<>("C", 3, 0),
+                new KeyValueTimestamp<>("D", 4, 0),
+                new KeyValueTimestamp<>("A", 5, 10),
+                new KeyValueTimestamp<>("A", 6, 8)), processors.get(1).processed);
+        assertEquals(asList(new KeyValueTimestamp<>("A", null, 5),
+                new KeyValueTimestamp<>("B", 2, 100),
+                new KeyValueTimestamp<>("C", null, 0),
+                new KeyValueTimestamp<>("D", 4, 0),
+                new KeyValueTimestamp<>("A", null, 10),
+                new KeyValueTimestamp<>("A", 6, 8)), processors.get(2).processed);
+        assertEquals(asList(new KeyValueTimestamp<>("A", "01", 5),
+                new KeyValueTimestamp<>("B", "02", 100),
+                new KeyValueTimestamp<>("C", "03", 0),
+                new KeyValueTimestamp<>("D", "04", 0),
+                new KeyValueTimestamp<>("A", "05", 10),
+                new KeyValueTimestamp<>("A", "06", 8)), processors.get(3).processed);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 195e38d..e18b5a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
@@ -51,7 +52,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class KTableKTableInnerJoinTest {
-    private final static String[] EMPTY = new String[0];
+    private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
@@ -67,7 +68,7 @@ public class KTableKTableInnerJoinTest {
     public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
@@ -84,7 +85,7 @@ public class KTableKTableInnerJoinTest {
     public void testQueryableJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
@@ -101,7 +102,7 @@ public class KTableKTableInnerJoinTest {
     public void testQueryableNotSendingOldValues() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
@@ -120,7 +121,7 @@ public class KTableKTableInnerJoinTest {
     public void testNotSendingOldValues() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
@@ -139,7 +140,7 @@ public class KTableKTableInnerJoinTest {
     public void testSendingOldValues() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
@@ -179,16 +180,16 @@ public class KTableKTableInnerJoinTest {
             driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 5)", "1:(X1+Y1<-null) (ts: 10)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+Y0", null), 5),
+                new KeyValueTimestamp<>(1, new Change<>("X1+Y1", null), 10));
             // push all four items to the primary stream. this should produce two items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult("0:(XX0+Y0<-X0+Y0) (ts: 7)", "1:(XX1+Y1<-X1+Y1) (ts: 10)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+Y0", "X0+Y0"), 7),
+                new KeyValueTimestamp<>(1, new Change<>("XX1+Y1", "X1+Y1"), 10));
             // push all items to the other stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
@@ -196,8 +197,10 @@ public class KTableKTableInnerJoinTest {
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
             proc.checkAndClearProcessResult(
-                "0:(XX0+YY0<-XX0+Y0) (ts: 7)", "1:(XX1+YY1<-XX1+Y1) (ts: 7)",
-                "2:(XX2+YY2<-null) (ts: 10)", "3:(XX3+YY3<-null) (ts: 15)");
+                new KeyValueTimestamp<>(0, new Change<>("XX0+YY0", "XX0+Y0"), 7),
+                new KeyValueTimestamp<>(1, new Change<>("XX1+YY1", "XX1+Y1"), 7),
+                new KeyValueTimestamp<>(2, new Change<>("XX2+YY2", null), 10),
+                new KeyValueTimestamp<>(3, new Change<>("XX3+YY3", null), 15));
 
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -206,24 +209,26 @@ public class KTableKTableInnerJoinTest {
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
             proc.checkAndClearProcessResult(
-                "0:(XXX0+YY0<-XX0+YY0) (ts: 6)", "1:(XXX1+YY1<-XX1+YY1) (ts: 6)",
-                "2:(XXX2+YY2<-XX2+YY2) (ts: 10)", "3:(XXX3+YY3<-XX3+YY3) (ts: 15)");
+                new KeyValueTimestamp<>(0, new Change<>("XXX0+YY0", "XX0+YY0"), 6),
+                new KeyValueTimestamp<>(1, new Change<>("XXX1+YY1", "XX1+YY1"), 6),
+                new KeyValueTimestamp<>(2, new Change<>("XXX2+YY2", "XX2+YY2"), 10),
+                new KeyValueTimestamp<>(3, new Change<>("XXX3+YY3", "XX3+YY3"), 15));
 
             // push two items with null to the other stream as deletes. this should produce two item.
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("0:(null<-XXX0+YY0) (ts: 6)", "1:(null<-XXX1+YY1) (ts: 7)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>(null, "XXX0+YY0"), 6),
+                new KeyValueTimestamp<>(1, new Change<>(null, "XXX1+YY1"), 7));
             // push all four items to the primary stream. this should produce two items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
             }
             // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("2:(XXXX2+YY2<-XXX2+YY2) (ts: 13)", "3:(XXXX3+YY3<-XXX3+YY3) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(2, new Change<>("XXXX2+YY2", "XXX2+YY2"), 13),
+                new KeyValueTimestamp<>(3, new Change<>("XXXX3+YY3", "XXX3+YY3"), 15));
             // push four items to the primary stream with null. this should produce two items.
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
@@ -231,7 +236,8 @@ public class KTableKTableInnerJoinTest {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
             // left:
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("2:(null<-XXXX2+YY2) (ts: 10)", "3:(null<-XXXX3+YY3) (ts: 20)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(2, new Change<>(null, "XXXX2+YY2"), 10),
+                new KeyValueTimestamp<>(3, new Change<>(null, "XXXX3+YY3"), 20));
         }
     }
 
@@ -289,16 +295,16 @@ public class KTableKTableInnerJoinTest {
             driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 5)", "1:(X1+Y1<-null) (ts: 10)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+Y0", null), 5),
+                new KeyValueTimestamp<>(1, new Change<>("X1+Y1", null), 10));
             // push all four items to the primary stream. this should produce two items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult("0:(XX0+Y0<-null) (ts: 7)", "1:(XX1+Y1<-null) (ts: 10)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+Y0", null), 7),
+                new KeyValueTimestamp<>(1, new Change<>("XX1+Y1", null), 10));
             // push all items to the other stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
@@ -306,8 +312,10 @@ public class KTableKTableInnerJoinTest {
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
             proc.checkAndClearProcessResult(
-                "0:(XX0+YY0<-null) (ts: 7)", "1:(XX1+YY1<-null) (ts: 7)",
-                "2:(XX2+YY2<-null) (ts: 10)", "3:(XX3+YY3<-null) (ts: 15)");
+                new KeyValueTimestamp<>(0, new Change<>("XX0+YY0", null), 7),
+                new KeyValueTimestamp<>(1, new Change<>("XX1+YY1", null), 7),
+                new KeyValueTimestamp<>(2, new Change<>("XX2+YY2", null), 10),
+                new KeyValueTimestamp<>(3, new Change<>("XX3+YY3", null), 15));
 
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -316,24 +324,26 @@ public class KTableKTableInnerJoinTest {
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
             proc.checkAndClearProcessResult(
-                "0:(XXX0+YY0<-null) (ts: 6)", "1:(XXX1+YY1<-null) (ts: 6)",
-                "2:(XXX2+YY2<-null) (ts: 10)", "3:(XXX3+YY3<-null) (ts: 15)");
+                new KeyValueTimestamp<>(0, new Change<>("XXX0+YY0", null), 6),
+                new KeyValueTimestamp<>(1, new Change<>("XXX1+YY1", null), 6),
+                new KeyValueTimestamp<>(2, new Change<>("XXX2+YY2", null), 10),
+                new KeyValueTimestamp<>(3, new Change<>("XXX3+YY3", null), 15));
 
             // push two items with null to the other stream as deletes. this should produce two item.
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("0:(null<-null) (ts: 6)", "1:(null<-null) (ts: 7)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>(null, null), 6),
+                new KeyValueTimestamp<>(1, new Change<>(null, null), 7));
             // push all four items to the primary stream. this should produce two items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
             }
             // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("2:(XXXX2+YY2<-null) (ts: 13)", "3:(XXXX3+YY3<-null) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(2, new Change<>("XXXX2+YY2", null), 13),
+                new KeyValueTimestamp<>(3, new Change<>("XXXX3+YY3", null), 15));
             // push four items to the primary stream with null. this should produce two items.
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
@@ -341,7 +351,8 @@ public class KTableKTableInnerJoinTest {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
             // left:
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("2:(null<-null) (ts: 10)", "3:(null<-null) (ts: 20)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(2, new Change<>(null, null), 10),
+                new KeyValueTimestamp<>(3, new Change<>(null, null), 20));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 92ff514..36417e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -70,7 +71,7 @@ public class KTableKTableLeftJoinTest {
     public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KTable<Integer, String> table1 = builder.table(topic1, consumed);
         final KTable<Integer, String> table2 = builder.table(topic2, consumed);
@@ -184,7 +185,7 @@ public class KTableKTableLeftJoinTest {
     public void testNotSendingOldValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
@@ -213,7 +214,8 @@ public class KTableKTableLeftJoinTest {
             driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right:
-            proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 5)", "1:(X1+null<-null) (ts: 6)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+null", null), 5),
+                new KeyValueTimestamp<>(1, new Change<>("X1+null", null), 6));
 
             // push two items to the other stream. this should produce two items.
             for (int i = 0; i < 2; i++) {
@@ -223,7 +225,8 @@ public class KTableKTableLeftJoinTest {
             driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 5)", "1:(X1+Y1<-null) (ts: 10)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+Y0", null), 5),
+                new KeyValueTimestamp<>(1, new Change<>("X1+Y1", null), 10));
 
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -231,9 +234,10 @@ public class KTableKTableLeftJoinTest {
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult(
-                "0:(XX0+Y0<-null) (ts: 7)", "1:(XX1+Y1<-null) (ts: 10)",
-                "2:(XX2+null<-null) (ts: 7)", "3:(XX3+null<-null) (ts: 7)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+Y0", null), 7),
+                new KeyValueTimestamp<>(1, new Change<>("XX1+Y1", null), 10),
+                new KeyValueTimestamp<>(2, new Change<>("XX2+null", null), 7),
+                new KeyValueTimestamp<>(3, new Change<>("XX3+null", null), 7));
 
             // push all items to the other stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -241,9 +245,10 @@ public class KTableKTableLeftJoinTest {
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XX0+YY0<-null) (ts: 7)", "1:(XX1+YY1<-null) (ts: 7)",
-                "2:(XX2+YY2<-null) (ts: 10)", "3:(XX3+YY3<-null) (ts: 15)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+YY0", null), 7),
+                new KeyValueTimestamp<>(1, new Change<>("XX1+YY1", null), 7),
+                new KeyValueTimestamp<>(2, new Change<>("XX2+YY2", null), 10),
+                new KeyValueTimestamp<>(3, new Change<>("XX3+YY3", null), 15));
 
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -251,16 +256,18 @@ public class KTableKTableLeftJoinTest {
             }
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XXX0+YY0<-null) (ts: 6)", "1:(XXX1+YY1<-null) (ts: 6)",
-                "2:(XXX2+YY2<-null) (ts: 10)", "3:(XXX3+YY3<-null) (ts: 15)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXX0+YY0", null), 6),
+                new KeyValueTimestamp<>(1, new Change<>("XXX1+YY1", null), 6),
+                new KeyValueTimestamp<>(2, new Change<>("XXX2+YY2", null), 10),
+                new KeyValueTimestamp<>(3, new Change<>("XXX3+YY3", null), 15));
 
             // push two items with null to the other stream as deletes. this should produce two item.
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("0:(XXX0+null<-null) (ts: 6)", "1:(XXX1+null<-null) (ts: 7)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXX0+null", null), 6),
+                new KeyValueTimestamp<>(1, new Change<>("XXX1+null", null), 7));
 
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -268,9 +275,10 @@ public class KTableKTableLeftJoinTest {
             }
             // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XXXX0+null<-null) (ts: 13)", "1:(XXXX1+null<-null) (ts: 13)",
-                "2:(XXXX2+YY2<-null) (ts: 13)", "3:(XXXX3+YY3<-null) (ts: 15)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXXX0+null", null), 13),
+                new KeyValueTimestamp<>(1, new Change<>("XXXX1+null", null), 13),
+                new KeyValueTimestamp<>(2, new Change<>("XXXX2+YY2", null), 13),
+                new KeyValueTimestamp<>(3, new Change<>("XXXX3+YY3", null), 15));
 
             // push four items to the primary stream with null. this should produce four items.
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
@@ -279,9 +287,10 @@ public class KTableKTableLeftJoinTest {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
             // left:
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(null<-null) (ts: 0)", "1:(null<-null) (ts: 42)",
-                "2:(null<-null) (ts: 10)", "3:(null<-null) (ts: 20)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>(null, null), 0),
+                new KeyValueTimestamp<>(1, new Change<>(null, null), 42),
+                new KeyValueTimestamp<>(2, new Change<>(null, null), 10),
+                new KeyValueTimestamp<>(3, new Change<>(null, null), 20));
         }
     }
 
@@ -289,7 +298,7 @@ public class KTableKTableLeftJoinTest {
     public void testSendingOldValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
@@ -320,7 +329,8 @@ public class KTableKTableLeftJoinTest {
             driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right:
-            proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 5)", "1:(X1+null<-null) (ts: 6)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+null", null), 5),
+                new KeyValueTimestamp<>(1, new Change<>("X1+null", null), 6));
 
             // push two items to the other stream. this should produce two items.
             for (int i = 0; i < 2; i++) {
@@ -330,7 +340,8 @@ public class KTableKTableLeftJoinTest {
             driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null) (ts: 5)", "1:(X1+Y1<-X1+null) (ts: 10)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+Y0", "X0+null"), 5),
+                new KeyValueTimestamp<>(1, new Change<>("X1+Y1", "X1+null"), 10));
 
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -338,9 +349,10 @@ public class KTableKTableLeftJoinTest {
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult(
-                "0:(XX0+Y0<-X0+Y0) (ts: 7)", "1:(XX1+Y1<-X1+Y1) (ts: 10)",
-                "2:(XX2+null<-null) (ts: 7)", "3:(XX3+null<-null) (ts: 7)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+Y0", "X0+Y0"), 7),
+                new KeyValueTimestamp<>(1, new Change<>("XX1+Y1", "X1+Y1"), 10),
+                new KeyValueTimestamp<>(2, new Change<>("XX2+null", null), 7),
+                new KeyValueTimestamp<>(3, new Change<>("XX3+null", null), 7));
 
             // push all items to the other stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -348,26 +360,28 @@ public class KTableKTableLeftJoinTest {
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XX0+YY0<-XX0+Y0) (ts: 7)", "1:(XX1+YY1<-XX1+Y1) (ts: 7)",
-                "2:(XX2+YY2<-XX2+null) (ts: 10)", "3:(XX3+YY3<-XX3+null) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+YY0", "XX0+Y0"), 7),
+                new KeyValueTimestamp<>(1, new Change<>("XX1+YY1", "XX1+Y1"), 7),
+                new KeyValueTimestamp<>(2, new Change<>("XX2+YY2", "XX2+null"), 10),
+                new KeyValueTimestamp<>(3, new Change<>("XX3+YY3", "XX3+null"), 15));
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
             }
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XXX0+YY0<-XX0+YY0) (ts: 6)", "1:(XXX1+YY1<-XX1+YY1) (ts: 6)",
-                "2:(XXX2+YY2<-XX2+YY2) (ts: 10)", "3:(XXX3+YY3<-XX3+YY3) (ts: 15)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXX0+YY0", "XX0+YY0"), 6),
+                new KeyValueTimestamp<>(1, new Change<>("XXX1+YY1", "XX1+YY1"), 6),
+                new KeyValueTimestamp<>(2, new Change<>("XXX2+YY2", "XX2+YY2"), 10),
+                new KeyValueTimestamp<>(3, new Change<>("XXX3+YY3", "XX3+YY3"), 15));
 
             // push two items with null to the other stream as deletes. this should produce two item.
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("0:(XXX0+null<-XXX0+YY0) (ts: 6)", "1:(XXX1+null<-XXX1+YY1) (ts: 7)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXX0+null", "XXX0+YY0"), 6),
+                new KeyValueTimestamp<>(1, new Change<>("XXX1+null", "XXX1+YY1"), 7));
 
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
@@ -375,10 +389,10 @@ public class KTableKTableLeftJoinTest {
             }
             // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XXXX0+null<-XXX0+null) (ts: 13)", "1:(XXXX1+null<-XXX1+null) (ts: 13)",
-                "2:(XXXX2+YY2<-XXX2+YY2) (ts: 13)", "3:(XXXX3+YY3<-XXX3+YY3) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXXX0+null", "XXX0+null"), 13),
+                new KeyValueTimestamp<>(1, new Change<>("XXXX1+null", "XXX1+null"), 13),
+                new KeyValueTimestamp<>(2, new Change<>("XXXX2+YY2", "XXX2+YY2"), 13),
+                new KeyValueTimestamp<>(3, new Change<>("XXXX3+YY3", "XXX3+YY3"), 15));
             // push four items to the primary stream with null. this should produce four items.
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
@@ -386,9 +400,10 @@ public class KTableKTableLeftJoinTest {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
             // left:
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(null<-XXXX0+null) (ts: 0)", "1:(null<-XXXX1+null) (ts: 42)",
-                "2:(null<-XXXX2+YY2) (ts: 10)", "3:(null<-XXXX3+YY3) (ts: 20)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>(null, "XXXX0+null"), 0),
+                new KeyValueTimestamp<>(1, new Change<>(null, "XXXX1+null"), 42),
+                new KeyValueTimestamp<>(2, new Change<>(null, "XXXX2+YY2"), 10),
+                new KeyValueTimestamp<>(3, new Change<>(null, "XXXX3+YY3"), 20));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 38c90ce..9568c73 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -54,7 +55,7 @@ public class KTableKTableOuterJoinTest {
     private final String output = "output";
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory =
-        new ConsumerRecordFactory<>(Serdes.Integer().serializer(), Serdes.String().serializer(), 0L);
+            new ConsumerRecordFactory<>(Serdes.Integer().serializer(), Serdes.String().serializer(), 0L);
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
@@ -73,7 +74,7 @@ public class KTableKTableOuterJoinTest {
         joined.toStream().to(output);
 
         final Collection<Set<String>> copartitionGroups =
-            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+                TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -208,8 +209,8 @@ public class KTableKTableOuterJoinTest {
             driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right:
-            proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 5)", "1:(X1+null<-null) (ts: 6)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+null", null), 5),
+                    new KeyValueTimestamp<>(1, new Change<>("X1+null", null), 6));
             // push two items to the other stream. this should produce two items.
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
@@ -218,55 +219,55 @@ public class KTableKTableOuterJoinTest {
             driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 5)", "1:(X1+Y1<-null) (ts: 10)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+Y0", null), 5),
+                    new KeyValueTimestamp<>(1, new Change<>("X1+Y1", null), 10));
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult(
-                "0:(XX0+Y0<-null) (ts: 7)", "1:(XX1+Y1<-null) (ts: 10)",
-                "2:(XX2+null<-null) (ts: 7)", "3:(XX3+null<-null) (ts: 7)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+Y0", null), 7),
+                    new KeyValueTimestamp<>(1, new Change<>("XX1+Y1", null), 10),
+                    new KeyValueTimestamp<>(2, new Change<>("XX2+null", null), 7),
+                    new KeyValueTimestamp<>(3, new Change<>("XX3+null", null), 7));
             // push all items to the other stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XX0+YY0<-null) (ts: 7)", "1:(XX1+YY1<-null) (ts: 7)",
-                "2:(XX2+YY2<-null) (ts: 10)", "3:(XX3+YY3<-null) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+YY0", null), 7),
+                    new KeyValueTimestamp<>(1, new Change<>("XX1+YY1", null), 7),
+                    new KeyValueTimestamp<>(2, new Change<>("XX2+YY2", null), 10),
+                    new KeyValueTimestamp<>(3, new Change<>("XX3+YY3", null), 15));
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
             }
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XXX0+YY0<-null) (ts: 6)", "1:(XXX1+YY1<-null) (ts: 6)",
-                "2:(XXX2+YY2<-null) (ts: 10)", "3:(XXX3+YY3<-null) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXX0+YY0", null), 6),
+                    new KeyValueTimestamp<>(1, new Change<>("XXX1+YY1", null), 6),
+                    new KeyValueTimestamp<>(2, new Change<>("XXX2+YY2", null), 10),
+                    new KeyValueTimestamp<>(3, new Change<>("XXX3+YY3", null), 15));
             // push two items with null to the other stream as deletes. this should produce two item.
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("0:(XXX0+null<-null) (ts: 6)", "1:(XXX1+null<-null) (ts: 7)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXX0+null", null), 6),
+                    new KeyValueTimestamp<>(1, new Change<>("XXX1+null", null), 7));
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
             }
             // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XXXX0+null<-null) (ts: 13)", "1:(XXXX1+null<-null) (ts: 13)",
-                "2:(XXXX2+YY2<-null) (ts: 13)", "3:(XXXX3+YY3<-null) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXXX0+null", null), 13),
+                    new KeyValueTimestamp<>(1, new Change<>("XXXX1+null", null), 13),
+                    new KeyValueTimestamp<>(2, new Change<>("XXXX2+YY2", null), 13),
+                    new KeyValueTimestamp<>(3, new Change<>("XXXX3+YY3", null), 15));
             // push four items to the primary stream with null. this should produce four items.
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
@@ -274,9 +275,10 @@ public class KTableKTableOuterJoinTest {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
             // left:
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(null<-null) (ts: 0)", "1:(null<-null) (ts: 42)",
-                "2:(null+YY2<-null) (ts: 10)", "3:(null+YY3<-null) (ts: 20)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>(null, null), 0),
+                    new KeyValueTimestamp<>(1, new Change<>(null, null), 42),
+                    new KeyValueTimestamp<>(2, new Change<>("null+YY2", null), 10),
+                    new KeyValueTimestamp<>(3, new Change<>("null+YY3", null), 20));
         }
     }
 
@@ -315,8 +317,8 @@ public class KTableKTableOuterJoinTest {
             driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right:
-            proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 5)", "1:(X1+null<-null) (ts: 6)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+null", null), 5),
+                    new KeyValueTimestamp<>(1, new Change<>("X1+null", null), 6));
             // push two items to the other stream. this should produce two items.
             for (int i = 0; i < 2; i++) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
@@ -325,55 +327,55 @@ public class KTableKTableOuterJoinTest {
             driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
             // left: X0:0 (ts: 5), X1:1 (ts: 6)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null) (ts: 5)", "1:(X1+Y1<-X1+null) (ts: 10)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("X0+Y0", "X0+null"), 5),
+                    new KeyValueTimestamp<>(1, new Change<>("X1+Y1", "X1+null"), 10));
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
-            proc.checkAndClearProcessResult(
-                "0:(XX0+Y0<-X0+Y0) (ts: 7)", "1:(XX1+Y1<-X1+Y1) (ts: 10)",
-                "2:(XX2+null<-null) (ts: 7)", "3:(XX3+null<-null) (ts: 7)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+Y0", "X0+Y0"), 7),
+                    new KeyValueTimestamp<>(1, new Change<>("XX1+Y1", "X1+Y1"), 10),
+                    new KeyValueTimestamp<>(2, new Change<>("XX2+null", null), 7),
+                    new KeyValueTimestamp<>(3, new Change<>("XX3+null", null), 7));
             // push all items to the other stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
             }
             // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XX0+YY0<-XX0+Y0) (ts: 7)", "1:(XX1+YY1<-XX1+Y1) (ts: 7)",
-                "2:(XX2+YY2<-XX2+null) (ts: 10)", "3:(XX3+YY3<-XX3+null) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XX0+YY0", "XX0+Y0"), 7),
+                    new KeyValueTimestamp<>(1, new Change<>("XX1+YY1", "XX1+Y1"), 7),
+                    new KeyValueTimestamp<>(2, new Change<>("XX2+YY2", "XX2+null"), 10),
+                    new KeyValueTimestamp<>(3, new Change<>("XX3+YY3", "XX3+null"), 15));
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
             }
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XXX0+YY0<-XX0+YY0) (ts: 6)", "1:(XXX1+YY1<-XX1+YY1) (ts: 6)",
-                "2:(XXX2+YY2<-XX2+YY2) (ts: 10)", "3:(XXX3+YY3<-XX3+YY3) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXX0+YY0", "XX0+YY0"), 6),
+                    new KeyValueTimestamp<>(1, new Change<>("XXX1+YY1", "XX1+YY1"), 6),
+                    new KeyValueTimestamp<>(2, new Change<>("XXX2+YY2", "XX2+YY2"), 10),
+                    new KeyValueTimestamp<>(3, new Change<>("XXX3+YY3", "XX3+YY3"), 15));
             // push two items with null to the other stream as deletes. this should produce two item.
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
             driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
             // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult("0:(XXX0+null<-XXX0+YY0) (ts: 6)", "1:(XXX1+null<-XXX1+YY1) (ts: 7)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXX0+null", "XXX0+YY0"), 6),
+                    new KeyValueTimestamp<>(1, new Change<>("XXX1+null", "XXX1+YY1"), 7));
             // push all four items to the primary stream. this should produce four items.
             for (final int expectedKey : expectedKeys) {
                 driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
             }
             // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(XXXX0+null<-XXX0+null) (ts: 13)", "1:(XXXX1+null<-XXX1+null) (ts: 13)",
-                "2:(XXXX2+YY2<-XXX2+YY2) (ts: 13)", "3:(XXXX3+YY3<-XXX3+YY3) (ts: 15)");
-
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>("XXXX0+null", "XXX0+null"), 13),
+                    new KeyValueTimestamp<>(1, new Change<>("XXXX1+null", "XXX1+null"), 13),
+                    new KeyValueTimestamp<>(2, new Change<>("XXXX2+YY2", "XXX2+YY2"), 13),
+                    new KeyValueTimestamp<>(3, new Change<>("XXXX3+YY3", "XXX3+YY3"), 15));
             // push four items to the primary stream with null. this should produce four items.
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
@@ -381,9 +383,10 @@ public class KTableKTableOuterJoinTest {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
             // left:
             // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
-            proc.checkAndClearProcessResult(
-                "0:(null<-XXXX0+null) (ts: 0)", "1:(null<-XXXX1+null) (ts: 42)",
-                "2:(null+YY2<-XXXX2+YY2) (ts: 10)", "3:(null+YY3<-XXXX3+YY3) (ts: 20)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>(0, new Change<>(null, "XXXX0+null"), 0),
+                    new KeyValueTimestamp<>(1, new Change<>(null, "XXXX1+null"), 42),
+                    new KeyValueTimestamp<>(2, new Change<>("null+YY2", "XXXX2+YY2"), 10),
+                    new KeyValueTimestamp<>(3, new Change<>("null+YY3", "XXXX3+YY3"), 20));
         }
     }
 
@@ -393,9 +396,9 @@ public class KTableKTableOuterJoinTest {
 
         @SuppressWarnings("unchecked")
         final Processor<String, Change<String>> join = new KTableKTableOuterJoin<>(
-            (KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
-            (KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
-            null
+                (KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
+                (KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
+                null
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
@@ -414,10 +417,10 @@ public class KTableKTableOuterJoinTest {
                                                final String expectedValue,
                                                final long expectedTimestamp) {
         OutputVerifier.compareKeyValueTimestamp(
-            driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()),
-            expectedKey,
-            expectedValue,
-            expectedTimestamp);
+                driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()),
+                expectedKey,
+                expectedValue,
+                expectedTimestamp);
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 8b6f27a..b0d81f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -54,9 +55,11 @@ public class KTableMapKeysTest {
 
         final KStream<String, String> convertedStream = table1.toStream((key, value) -> keyMap.get(key));
 
-        final String[] expected = new String[]{"ONE:V_ONE (ts: 5)", "TWO:V_TWO (ts: 10)", "THREE:V_THREE (ts: 15)"};
-        final int[] originalKeys = new int[]{1, 2, 3};
-        final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"};
+        final KeyValueTimestamp[] expected = new KeyValueTimestamp[] {new KeyValueTimestamp<>("ONE", "V_ONE", 5),
+            new KeyValueTimestamp<>("TWO", "V_TWO", 10),
+            new KeyValueTimestamp<>("THREE", "V_THREE", 15)};
+        final int[] originalKeys = new int[] {1, 2, 3};
+        final String[] values = new String[] {"V_ONE", "V_TWO", "V_THREE"};
 
         final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
         convertedStream.process(supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 9b78291..a00318e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -59,7 +60,10 @@ public class KTableMapValuesTest {
             driver.pipeInput(recordFactory.create(topic1, "B", "2", 25L));
             driver.pipeInput(recordFactory.create(topic1, "C", "3", 20L));
             driver.pipeInput(recordFactory.create(topic1, "D", "4", 10L));
-            assertEquals(asList("A:1 (ts: 5)", "B:2 (ts: 25)", "C:3 (ts: 20)", "D:4 (ts: 10)"), supplier.theCapturedProcessor().processed);
+            assertEquals(asList(new KeyValueTimestamp<>("A", 1, 5),
+                    new KeyValueTimestamp<>("B", 2, 25),
+                    new KeyValueTimestamp<>("C", 3, 20),
+                    new KeyValueTimestamp<>("D", 4, 10)), supplier.theCapturedProcessor().processed);
         }
     }
 
@@ -212,17 +216,20 @@ public class KTableMapValuesTest {
             driver.pipeInput(recordFactory.create(topic1, "A", "01", 5L));
             driver.pipeInput(recordFactory.create(topic1, "B", "01", 10L));
             driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
-            proc.checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
+                    new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
+                    new KeyValueTimestamp<>("C", new Change<>(1, null), 15));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "02", 10L));
             driver.pipeInput(recordFactory.create(topic1, "B", "02", 8L));
-            proc.checkAndClearProcessResult("A:(2<-null) (ts: 10)", "B:(2<-null) (ts: 8)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, null), 10),
+                    new KeyValueTimestamp<>("B", new Change<>(2, null), 8));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "03", 20L));
-            proc.checkAndClearProcessResult("A:(3<-null) (ts: 20)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(3, null), 20));
 
             driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 30L));
-            proc.checkAndClearProcessResult("A:(null<-null) (ts: 30)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, null), 30));
         }
     }
 
@@ -249,17 +256,20 @@ public class KTableMapValuesTest {
             driver.pipeInput(recordFactory.create(topic1, "A", "01", 5L));
             driver.pipeInput(recordFactory.create(topic1, "B", "01", 10L));
             driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
-            proc.checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
+                    new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
+                    new KeyValueTimestamp<>("C", new Change<>(1, null), 15));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "02", 10L));
             driver.pipeInput(recordFactory.create(topic1, "B", "02", 8L));
-            proc.checkAndClearProcessResult("A:(2<-1) (ts: 10)", "B:(2<-1) (ts: 8)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, 1), 10),
+                    new KeyValueTimestamp<>("B", new Change<>(2, 1), 8));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "03", 20L));
-            proc.checkAndClearProcessResult("A:(3<-2) (ts: 20)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(3, 2), 20));
 
             driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 30L));
-            proc.checkAndClearProcessResult("A:(null<-3) (ts: 30)");
+            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 3), 30));
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 848b290..a936dc0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -74,7 +75,12 @@ public class KTableSourceTest {
         }
 
         assertEquals(
-            asList("A:1 (ts: 10)", "B:2 (ts: 11)", "C:3 (ts: 12)", "D:4 (ts: 13)", "A:null (ts: 14)", "B:null (ts: 15)"),
+            asList(new KeyValueTimestamp<>("A", 1, 10L),
+                new KeyValueTimestamp<>("B", 2, 11L),
+                new KeyValueTimestamp<>("C", 3, 12L),
+                new KeyValueTimestamp<>("D", 4, 13L),
+                new KeyValueTimestamp<>("A", null, 14L),
+                new KeyValueTimestamp<>("B", null, 15L)),
             supplier.theCapturedProcessor().processed);
     }
 
@@ -160,18 +166,22 @@ public class KTableSourceTest {
             driver.pipeInput(recordFactory.create(topic1, "A", "01", 10L));
             driver.pipeInput(recordFactory.create(topic1, "B", "01", 20L));
             driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
-            proc1.checkAndClearProcessResult("A:(01<-null) (ts: 10)", "B:(01<-null) (ts: 20)", "C:(01<-null) (ts: 15)");
+            proc1.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>("01", null), 10),
+                new KeyValueTimestamp<>("B", new Change<>("01", null), 20),
+                new KeyValueTimestamp<>("C", new Change<>("01", null), 15));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "02", 8L));
             driver.pipeInput(recordFactory.create(topic1, "B", "02", 22L));
-            proc1.checkAndClearProcessResult("A:(02<-null) (ts: 8)", "B:(02<-null) (ts: 22)");
+            proc1.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>("02", null), 8),
+                new KeyValueTimestamp<>("B", new Change<>("02", null), 22));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "03", 12L));
-            proc1.checkAndClearProcessResult("A:(03<-null) (ts: 12)");
+            proc1.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>("03", null), 12));
 
             driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 15L));
             driver.pipeInput(recordFactory.create(topic1, "B", (String) null, 20L));
-            proc1.checkAndClearProcessResult("A:(null<-null) (ts: 15)", "B:(null<-null) (ts: 20)");
+            proc1.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, null), 15),
+                new KeyValueTimestamp<>("B", new Change<>(null, null), 20));
         }
     }
 
@@ -194,18 +204,22 @@ public class KTableSourceTest {
             driver.pipeInput(recordFactory.create(topic1, "A", "01", 10L));
             driver.pipeInput(recordFactory.create(topic1, "B", "01", 20L));
             driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
-            proc1.checkAndClearProcessResult("A:(01<-null) (ts: 10)", "B:(01<-null) (ts: 20)", "C:(01<-null) (ts: 15)");
+            proc1.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>("01", null), 10),
+                new KeyValueTimestamp<>("B", new Change<>("01", null), 20),
+                new KeyValueTimestamp<>("C", new Change<>("01", null), 15));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "02", 8L));
             driver.pipeInput(recordFactory.create(topic1, "B", "02", 22L));
-            proc1.checkAndClearProcessResult("A:(02<-01) (ts: 8)", "B:(02<-01) (ts: 22)");
+            proc1.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>("02", "01"), 8),
+                new KeyValueTimestamp<>("B", new Change<>("02", "01"), 22));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "03", 12L));
-            proc1.checkAndClearProcessResult("A:(03<-02) (ts: 12)");
+            proc1.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>("03", "02"), 12));
 
             driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 15L));
             driver.pipeInput(recordFactory.create(topic1, "B", (String) null, 20L));
-            proc1.checkAndClearProcessResult("A:(null<-03) (ts: 15)", "B:(null<-02) (ts: 20)");
+            proc1.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, "03"), 15),
+                new KeyValueTimestamp<>("B", new Change<>(null, "02"), 20));
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index c2ddb54..ab09139 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -20,8 +20,9 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
@@ -331,7 +332,11 @@ public class KTableTransformValuesTest {
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 10L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null, 15L));
 
-        assertThat(output(), hasItems("A:A->a! (ts: 5)", "B:B->b! (ts: 10)", "D:D->null! (ts: 15)"));
+
+        assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "A->a!", 5),
+                new KeyValueTimestamp<>("B", "B->b!", 10),
+                new KeyValueTimestamp<>("D", "D->null!", 15)
+        ));
         assertNull("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME));
     }
 
@@ -355,7 +360,9 @@ public class KTableTransformValuesTest {
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 10L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", (String) null, 15L));
 
-        assertThat(output(), hasItems("A:A->a! (ts: 5)", "B:B->b! (ts: 10)", "C:C->null! (ts: 15)"));
+        assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "A->a!", 5),
+                new KeyValueTimestamp<>("B", "B->b!", 10),
+                new KeyValueTimestamp<>("C", "C->null!", 15)));
 
         {
             final KeyValueStore<String, String> keyValueStore = driver.getKeyValueStore(QUERYABLE_NAME);
@@ -392,7 +399,11 @@ public class KTableTransformValuesTest {
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 15L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 10L));
 
-        assertThat(output(), hasItems("A:1 (ts: 5)", "A:0 (ts: 15)", "A:2 (ts: 15)", "A:0 (ts: 15)", "A:3 (ts: 15)"));
+        assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "1", 5),
+                new KeyValueTimestamp<>("A", "0", 15),
+                new KeyValueTimestamp<>("A", "2", 15),
+                new KeyValueTimestamp<>("A", "0", 15),
+                new KeyValueTimestamp<>("A", "3", 15)));
 
         final KeyValueStore<String, Integer> keyValueStore = driver.getKeyValueStore(QUERYABLE_NAME);
         assertThat(keyValueStore.get("A"), is(3));
@@ -415,10 +426,14 @@ public class KTableTransformValuesTest {
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aa", 15L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aaa", 10));
 
-        assertThat(output(), hasItems("A:1 (ts: 5)", "A:0 (ts: 15)", "A:2 (ts: 15)", "A:0 (ts: 15)", "A:3 (ts: 15)"));
+        assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "1", 5),
+                 new KeyValueTimestamp<>("A", "0", 15),
+                 new KeyValueTimestamp<>("A", "2", 15),
+                 new KeyValueTimestamp<>("A", "0", 15),
+                 new KeyValueTimestamp<>("A", "3", 15)));
     }
 
-    private ArrayList<String> output() {
+    private ArrayList<KeyValueTimestamp<Object, Object>> output() {
         return capture.capturedProcessors(1).get(0).processed;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index f1e0597..182067c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -31,7 +32,7 @@ import static org.junit.Assert.assertEquals;
 
 public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
 
-    public final ArrayList<String> processed = new ArrayList<>();
+    public final ArrayList<KeyValueTimestamp<Object, Object>> processed = new ArrayList<>();
     public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>();
 
     public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
@@ -76,13 +77,15 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
 
     @Override
     public void process(final K key, final V value) {
+        KeyValueTimestamp<Object, Object> keyValueTimestamp = new KeyValueTimestamp<>(key, value, context().timestamp());
+
         if (value != null) {
             lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp()));
         } else {
             lastValueAndTimestampPerKey.remove(key);
         }
 
-        processed.add(makeRecord(key, value, context().timestamp()));
+        processed.add(keyValueTimestamp);
 
         if (commitRequested) {
             context().commit();
@@ -90,13 +93,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
         }
     }
 
-    public static String makeRecord(final Object key, final Object value, final long timestamp) {
-        return (key == null ? "null" : key) +
-            ":" + (value == null ? "null" : value) +
-            " (ts: " + timestamp + ")";
-    }
-
-    public void checkAndClearProcessResult(final String... expected) {
+    public void checkAndClearProcessResult(final KeyValueTimestamp... expected) {
         assertEquals("the number of outputs:" + processed, expected.length, processed.size());
         for (int i = 0; i < expected.length; i++) {
             assertEquals("output[" + i + "]:", expected[i], processed.get(i));


Mime
View raw message