kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9000: fix flaky FK join test by using TTD (#7517)
Date Thu, 17 Oct 2019 05:48:53 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 2a4b27c  KAFKA-9000: fix flaky FK join test by using TTD (#7517)
2a4b27c is described below

commit 2a4b27c02ad487954eb971e8e28bade475d6d816
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Thu Oct 17 00:40:57 2019 -0500

    KAFKA-9000: fix flaky FK join test by using TTD (#7517)
    
    Migrate this integration test to use TopologyTestDriver instead of running 3 Streams instances.
    
    Dropped one test that was attempting to produce specific interleavings. If anything, these should be verified deterministically by unit testing.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 checkstyle/suppressions.xml                        |    4 +-
 .../streams/integration/ForeignKeyJoinSuite.java   |    4 +-
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 1024 ++++++++------------
 ...scriptionResolverJoinProcessorSupplierTest.java |  223 +++++
 .../streams/processor/MockProcessorContext.java    |    9 +
 5 files changed, 637 insertions(+), 627 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2f21309..c0aeb95 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -217,9 +217,7 @@
               files="SmokeTestDriver.java"/>
 
     <suppress checks="NPathComplexity"
-              files="EosTestDriver|KStreamKStreamJoinTest.java|SmokeTestDriver.java"/>
-    <suppress checks="NPathComplexity"
-              files="KStreamKStreamLeftJoinTest.java"/>
+              files="EosTestDriver|KStreamKStreamJoinTest.java|SmokeTestDriver.java|KStreamKStreamLeftJoinTest.java|KTableKTableForeignKeyJoinIntegrationTest.java"/>
 
     <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
index 6245b11..ac6adb8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.common.utils.BytesTest;
 import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchemaTest;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplierTest;
 import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerdeTest;
 import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerdeTest;
 import org.junit.runner.RunWith;
@@ -39,7 +40,8 @@ import org.junit.runners.Suite;
     KTableKTableForeignKeyJoinIntegrationTest.class,
     CombinedKeySchemaTest.class,
     SubscriptionWrapperSerdeTest.class,
-    SubscriptionResponseWrapperSerdeTest.class
+    SubscriptionResponseWrapperSerdeTest.class,
+    SubscriptionResolverJoinProcessorSupplierTest.class
 })
 public class ForeignKeyJoinSuite {
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index ae0f3c2..14a39b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -16,684 +16,462 @@
  */
 package org.apache.kafka.streams.integration;
 
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.FloatSerializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.function.Function;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertArrayEquals;
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 
-@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
 public class KTableKTableForeignKeyJoinIntegrationTest {
-    private final static int NUM_BROKERS = 1;
-
-    @ClassRule
-    public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final static MockTime MOCK_TIME = CLUSTER.time;
-    private final static String LEFT_TABLE = "left_table";
-    private final static String RIGHT_TABLE = "right_table";
-    private final static String OUTPUT = "output-topic";
-    private static Properties streamsConfig;
-    private KafkaStreams streams;
-    private KafkaStreams streamsTwo;
-    private KafkaStreams streamsThree;
-    private static final  Properties CONSUMER_CONFIG = new Properties();
-    private static final Properties LEFT_PROD_CONF = new Properties();
-    private static final Properties RIGHT_PROD_CONF = new Properties();
-
-    @BeforeClass
-    public static void beforeTest() {
-        //Use multiple partitions to ensure distribution of keys.
-        LEFT_PROD_CONF.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        LEFT_PROD_CONF.put(ProducerConfig.ACKS_CONFIG, "all");
-        LEFT_PROD_CONF.put(ProducerConfig.RETRIES_CONFIG, 0);
-        LEFT_PROD_CONF.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        LEFT_PROD_CONF.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, FloatSerializer.class);
-
-        RIGHT_PROD_CONF.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        RIGHT_PROD_CONF.put(ProducerConfig.ACKS_CONFIG, "all");
-        RIGHT_PROD_CONF.put(ProducerConfig.RETRIES_CONFIG, 0);
-        RIGHT_PROD_CONF.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        RIGHT_PROD_CONF.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
-
-        streamsConfig = new Properties();
-        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
-
-        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
-        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    }
-
-    @Before
-    public void before() throws IOException, InterruptedException {
-        CLUSTER.deleteTopicsAndWait(LEFT_TABLE);
-        CLUSTER.deleteTopicsAndWait(RIGHT_TABLE);
-        CLUSTER.deleteTopicsAndWait(OUTPUT);
-
-        CLUSTER.createTopic(LEFT_TABLE, 3, 1);
-        CLUSTER.createTopic(RIGHT_TABLE, 3, 1);
-        CLUSTER.createTopic(OUTPUT, 3, 1);
-
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
-    }
 
-    @After
-    public void after() throws IOException {
-        if (streams != null) {
-            streams.close();
-            streams = null;
-        }
-        if (streamsTwo != null) {
-            streamsTwo.close();
-            streamsTwo = null;
-        }
-        if (streamsThree != null) {
-            streamsThree.close();
-            streamsThree = null;
-        }
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+    private static final String LEFT_TABLE = "left_table";
+    private static final String RIGHT_TABLE = "right_table";
+    private static final String OUTPUT = "output-topic";
+    private final Properties streamsConfig;
+    private final boolean leftJoin;
+
+    public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin, final String optimization) {
+        this.leftJoin = leftJoin;
+        streamsConfig = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+            mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)
+        ));
     }
 
-    @Test
-    public void doInnerJoinFromLeftThenDeleteLeftEntity() throws Exception {
-        final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(new KeyValue<>("1", 10L), new KeyValue<>("2", 20L)); //partition 0
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
-
-        final String currentMethodName = new Object() { }
-                .getClass()
-                .getEnclosingMethod()
-                .getName();
-        createAndStartStreamsApplication(currentMethodName, false);
-
-        final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(new KeyValue<>(1, 1.33f), new KeyValue<>(2, 2.77f));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-
-        final Set<KeyValue<Integer, String>> expected = new HashSet<>();
-        expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));
-        expected.add(new KeyValue<>(2, "value1=2.77,value2=20"));
-
-        final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size()));
-        assertEquals(expected, result);
-
-        //Now delete one LHS entity such that one delete is propagated down to the output.
-        final Set<KeyValue<Integer, String>> expectedDeleted = new HashSet<>();
-        expectedDeleted.add(new KeyValue<>(1, null));
-
-        final List<KeyValue<Integer, Float>> rightTableDeleteEvents = Arrays.asList(new KeyValue<>(1, null));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, rightTableDeleteEvents, LEFT_PROD_CONF, MOCK_TIME);
-        final Set<KeyValue<Integer, String>> resultDeleted = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expectedDeleted.size()));
-        assertEquals(expectedDeleted, resultDeleted);
-
-        //Ensure the state stores have the correct values within:
-        final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
-        expMatResults.add(new KeyValue<>(2, "value1=2.77,value2=20"));
-        validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
-    }
-
-    @Test
-    public void doLeftJoinFromLeftThenDeleteLeftEntity() throws Exception {
-        final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(new KeyValue<>("1", 10L), new KeyValue<>("2", 20L)); //partition 0
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
-
-        final String currentMethodName = new Object() { }
-                .getClass()
-                .getEnclosingMethod()
-                .getName();
-        createAndStartStreamsApplication(currentMethodName, true);
-
-        final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(new KeyValue<>(1, 1.33f), new KeyValue<>(2, 2.77f));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-
-        final Set<KeyValue<Integer, String>> expected = new HashSet<>();
-        expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));
-        expected.add(new KeyValue<>(2, "value1=2.77,value2=20"));
-
-        final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size()));
-        assertEquals(expected, result);
-
-        //Now delete one LHS entity such that one delete is propagated down to the output.
-        final Set<KeyValue<Integer, String>> expectedDeleted = new HashSet<>();
-        expectedDeleted.add(new KeyValue<>(1, null));
-
-        final List<KeyValue<Integer, Float>> rightTableDeleteEvents = Arrays.asList(new KeyValue<>(1, null));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, rightTableDeleteEvents, LEFT_PROD_CONF, MOCK_TIME);
-        final Set<KeyValue<Integer, String>> resultDeleted = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expectedDeleted.size()));
-        assertEquals(expectedDeleted, resultDeleted);
-
-        //Ensure the state stores have the correct values within:
-        final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
-        expMatResults.add(new KeyValue<>(2, "value1=2.77,value2=20"));
-        validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
-    }
-
-    @Test
-    public void doInnerJoinFromRightThenDeleteRightEntity() throws Exception {
-        final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(
-                new KeyValue<>(1, 1.33f),
-                new KeyValue<>(2, 1.77f),
-                new KeyValue<>(3, 3.77f));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-        final String currentMethodName = new Object() { }
-                .getClass()
-                .getEnclosingMethod()
-                .getName();
-        createAndStartStreamsApplication(currentMethodName, false);
-
-        final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(
-                new KeyValue<>("1", 10L),  //partition 0
-                new KeyValue<>("2", 20L),  //partition 2
-                new KeyValue<>("3", 30L)); //partition 2
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
-
-        //Ensure that the joined values exist in the output
-        final Set<KeyValue<Integer, String>> expected = new HashSet<>();
-        expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));   //Will be deleted.
-        expected.add(new KeyValue<>(2, "value1=1.77,value2=10"));   //Will be deleted.
-        expected.add(new KeyValue<>(3, "value1=3.77,value2=30"));   //Will not be deleted.
-
-        final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size()));
-        assertEquals(expected, result);
-
-        //Now delete the RHS entity such that all matching keys have deletes propagated.
-        final Set<KeyValue<Integer, String>> expectedDeleted = new HashSet<>();
-        expectedDeleted.add(new KeyValue<>(1, null));
-        expectedDeleted.add(new KeyValue<>(2, null));
-
-        final List<KeyValue<String, Long>> rightTableDeleteEvents = Arrays.asList(new KeyValue<>("1", null));
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableDeleteEvents, RIGHT_PROD_CONF, MOCK_TIME);
-        final Set<KeyValue<Integer, String>> resultDeleted = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expectedDeleted.size()));
-        assertEquals(expectedDeleted, resultDeleted);
-
-        //Ensure the state stores have the correct values within:
-        final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
-        expMatResults.add(new KeyValue<>(3, "value1=3.77,value2=30"));
-        validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
+    @Parameterized.Parameters(name = "leftJoin={0}, optimization={1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+            new Object[] {false, StreamsConfig.OPTIMIZE},
+            new Object[] {false, StreamsConfig.NO_OPTIMIZATION},
+            new Object[] {true, StreamsConfig.OPTIMIZE},
+            new Object[] {true, StreamsConfig.NO_OPTIMIZATION}
+        );
     }
 
     @Test
-    public void doLeftJoinFromRightThenDeleteRightEntity() throws Exception {
-        final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(
-                new KeyValue<>(1, 1.33f),
-                new KeyValue<>(2, 1.77f),
-                new KeyValue<>(3, 3.77f));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-        final String currentMethodName = new Object() { }
-                .getClass()
-                .getEnclosingMethod()
-                .getName();
-        createAndStartStreamsApplication(currentMethodName, true);
-
-        final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(
-                new KeyValue<>("1", 10L),  //partition 0
-                new KeyValue<>("2", 20L),  //partition 2
-                new KeyValue<>("3", 30L)); //partition 2
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
-
-        //Ensure that the joined values exist in the output
-        final Set<KeyValue<Integer, String>> expected = new HashSet<>();
-        expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));   //Will be deleted.
-        expected.add(new KeyValue<>(2, "value1=1.77,value2=10"));   //Will be deleted.
-        expected.add(new KeyValue<>(3, "value1=3.77,value2=30"));   //Will not be deleted.
-        //final HashSet<KeyValue<Integer, String>> expected = new HashSet<>(buildExpectedResults(leftTableEvents, rightTableEvents, false));
-
-        final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size()));
-        assertEquals(expected, result);
-
-        //Now delete the RHS entity such that all matching keys have deletes propagated.
-        //This will exercise the joiner with the RHS value == null.
-        final Set<KeyValue<Integer, String>> expectedDeleted = new HashSet<>();
-        expectedDeleted.add(new KeyValue<>(1, "value1=1.33,value2=null"));
-        expectedDeleted.add(new KeyValue<>(2, "value1=1.77,value2=null"));
-
-        final List<KeyValue<String, Long>> rightTableDeleteEvents = Arrays.asList(new KeyValue<>("1", null));
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableDeleteEvents, RIGHT_PROD_CONF, MOCK_TIME);
-        final Set<KeyValue<Integer, String>> resultDeleted = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expectedDeleted.size()));
-        assertEquals(expectedDeleted, resultDeleted);
-
-        //Ensure the state stores have the correct values within:
-        final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
-        expMatResults.add(new KeyValue<>(1, "value1=1.33,value2=null"));
-        expMatResults.add(new KeyValue<>(2, "value1=1.77,value2=null"));
-        expMatResults.add(new KeyValue<>(3, "value1=3.77,value2=30"));
-        validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
+    public void doJoinFromLeftThenDeleteLeftEntity() {
+        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+            // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
+            right.pipeInput("rhs1", "rhsValue1");
+            right.pipeInput("rhs2", "rhsValue2");
+            right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(emptyMap())
+            );
+            assertThat(
+                asMap(store),
+                is(emptyMap())
+            );
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            left.pipeInput("lhs2", "lhsValue2|rhs2");
+
+            {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                    mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
+                );
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(expected)
+                );
+                assertThat(
+                    asMap(store),
+                    is(expected)
+                );
+            }
+
+            // Add another reference to an existing FK
+            left.pipeInput("lhs3", "lhsValue3|rhs1");
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(mkMap(
+                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    ))
+                );
+                assertThat(
+                    asMap(store),
+                    is(mkMap(
+                        mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                        mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    ))
+                );
+            }
+            // Now delete one LHS entity such that one delete is propagated down to the output.
+
+            left.pipeInput("lhs1", (String) null);
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(mkMap(
+                    mkEntry("lhs1", null)
+                ))
+            );
+            assertThat(
+                asMap(store),
+                is(mkMap(
+                    mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                    mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                ))
+            );
+        }
     }
 
     @Test
-    public void doInnerJoinProduceNullsWhenValueHasNonMatchingForeignKey() throws Exception {
-        //There is no matching extracted foreign-key of 8 anywhere. Should not produce any output for INNER JOIN, only
-        //because the state is transitioning from oldValue=null -> newValue=8.33.
-        List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(new KeyValue<>(1, 8.33f));
-        final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(new KeyValue<>("1", 10L)); //partition 0
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
-
-        final String currentMethodName = new Object() { }
-                .getClass()
-                .getEnclosingMethod()
-                .getName();
-        createAndStartStreamsApplication(currentMethodName, false);
-
-        //There is also no matching extracted foreign-key for 18 anywhere. This WILL produce a null output for INNER JOIN,
-        //since we cannot remember (maintain state) that the FK=8 also produced a null result.
-        leftTableEvents = Arrays.asList(new KeyValue<>(1, 18.00f));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-
-        final List<KeyValue<Integer, String>> expected = new LinkedList<>();
-        expected.add(new KeyValue<>(1, null));
-
-        final List<KeyValue<Integer, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size());
-        assertEquals(result, expected);
-
-        //Another change to FK that has no match on the RHS will result in another null
-        leftTableEvents = Arrays.asList(new KeyValue<>(1, 100.00f));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-        //Consume the next event - note that we are using the same consumerGroupId, so this will consume a new event.
-        final List<KeyValue<Integer, String>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size());
-        assertEquals(result2, expected);
-
-        //Now set the LHS event FK to match the rightTableEvents key-value.
-        leftTableEvents = Arrays.asList(new KeyValue<>(1, 1.11f));
-
-        final List<KeyValue<Integer, String>> expected3 = new LinkedList<>();
-        expected3.add(new KeyValue<>(1, "value1=1.11,value2=10"));
-
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-        final List<KeyValue<Integer, String>> result3 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected3.size());
-        assertEquals(result3, expected3);
-
-        //Ensure the state stores have the correct values within:
-        final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
-        expMatResults.add(new KeyValue<>(1, "value1=1.11,value2=10"));
-        validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
+    public void doJoinFromRightThenDeleteRightEntity() {
+        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+            // Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records
+            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            left.pipeInput("lhs2", "lhsValue2|rhs2");
+            left.pipeInput("lhs3", "lhsValue3|rhs1");
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(leftJoin
+                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+                               mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+                               mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+                       : emptyMap()
+                )
+            );
+            assertThat(
+                asMap(store),
+                is(leftJoin
+                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+                               mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+                               mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+                       : emptyMap()
+                )
+            );
+
+            right.pipeInput("rhs1", "rhsValue1");
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                         mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                )
+            );
+            assertThat(
+                asMap(store),
+                is(leftJoin
+                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                               mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+                               mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+
+                       : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                               mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                )
+            );
+
+            right.pipeInput("rhs2", "rhsValue2");
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")))
+            );
+            assertThat(
+                asMap(store),
+                is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                         mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                         mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                )
+            );
+
+            right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(emptyMap())
+            );
+            assertThat(
+                asMap(store),
+                is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                         mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                         mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                )
+            );
+
+            // Now delete the RHS entity such that all matching keys have deletes propagated.
+            right.pipeInput("rhs1", (String) null);
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs1,null)" : null),
+                         mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null))
+                )
+            );
+            assertThat(
+                asMap(store),
+                is(leftJoin
+                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+                               mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                               mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+
+                       : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"))
+                )
+            );
+        }
     }
 
     @Test
-    public void doLeftJoinProduceJoinedResultsWhenValueHasNonMatchingForeignKey() throws Exception {
-        //There is no matching extracted foreign-key of 8 anywhere.
-        //However, it will still run the join function since this is LEFT join.
-        List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(new KeyValue<>(1, 8.33f));
-        final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(new KeyValue<>("1", 10L)); //partition 0
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
-
-        final String currentMethodName = new Object() { }
-                .getClass()
-                .getEnclosingMethod()
-                .getName();
-        createAndStartStreamsApplication(currentMethodName, true);
-
-        final List<KeyValue<Integer, String>> expected = new LinkedList<>();
-        expected.add(new KeyValue<>(1, "value1=8.33,value2=null"));
-        final List<KeyValue<Integer, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size());
-        assertEquals(expected, result);
-
-        //There is also no matching extracted foreign-key for 18 anywhere.
-        //However, it will still run the join function since this if LEFT join.
-        leftTableEvents = Arrays.asList(new KeyValue<>(1, 18.0f));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-
-        final List<KeyValue<Integer, String>> expected2 = new LinkedList<>();
-        expected2.add(new KeyValue<>(1, "value1=18.0,value2=null"));
-        final List<KeyValue<Integer, String>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected2.size());
-        assertEquals(expected2, result2);
-
-
-        leftTableEvents = Arrays.asList(new KeyValue<>(1, 1.11f));
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-
-        final List<KeyValue<Integer, String>> expected3 = new LinkedList<>();
-        expected3.add(new KeyValue<>(1, "value1=1.11,value2=10"));
-        final List<KeyValue<Integer, String>> result3 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected3.size());
-        assertEquals(expected3, result3);
-
-        //Ensure the state stores have the correct values within:
-        final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
-        expMatResults.add(new KeyValue<>(1, "value1=1.11,value2=10"));
-        validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
+    public void shouldEmitTombstonedWhenDeletingNonJoiningRecords() {
+        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1");
+
+            {
+                final Map<String, String> expected =
+                    leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap();
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(expected)
+                );
+                assertThat(
+                    asMap(store),
+                    is(expected)
+                );
+            }
+
+            // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
+            // it's not possible to know whether a result was previously emitted.
+            // For the left join, the tombstone is necessary.
+            left.pipeInput("lhs1", (String) null);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(Utils.<String, String>mkMap(mkEntry("lhs1", null)))
+                );
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
+
+            // Deleting a non-existing record is idempotent
+            left.pipeInput("lhs1", (String) null);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
+        }
     }
 
     @Test
-    public void doInnerJoinFilterOutRapidlyChangingForeignKeyValues() throws Exception {
-        final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(
-                new KeyValue<>(1, 1.33f),
-                new KeyValue<>(2, 2.22f),
-                new KeyValue<>(3, -1.22f), //Won't be joined in
-                new KeyValue<>(4, -2.22f), //Won't be joined in
-                new KeyValue<>(5, 2.22f)
-        );
-
-        //Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
-        final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(
-                new KeyValue<>("0", 0L),  //partition 2
-                new KeyValue<>("1", 10L), //partition 0
-                new KeyValue<>("2", 20L), //partition 2
-                new KeyValue<>("3", 30L), //partition 2
-                new KeyValue<>("4", 40L), //partition 1
-                new KeyValue<>("5", 50L), //partition 0
-                new KeyValue<>("6", 60L), //partition 1
-                new KeyValue<>("7", 70L), //partition 0
-                new KeyValue<>("8", 80L), //partition 0
-                new KeyValue<>("9", 90L)  //partition 2
-        );
-
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
-
-        final Set<KeyValue<Integer, String>> expected = new HashSet<>();
-        expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));
-        expected.add(new KeyValue<>(2, "value1=2.22,value2=20"));
-        expected.add(new KeyValue<>(5, "value1=2.22,value2=20"));
-
-        final String currentMethodName = new Object() { }
-                .getClass()
-                .getEnclosingMethod()
-                .getName();
-        createAndStartStreamsApplication(currentMethodName, false);
-
-        final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size()));
-
-        assertEquals(result, expected);
-
-        //Rapidly change the foreign key, to validate that the hashing prevents incorrect results from being output,
-        //and that eventually the correct value is output.
-        final List<KeyValue<Integer, Float>> table1ForeignKeyChange = Arrays.asList(
-                new KeyValue<>(3, 2.22f), //Partition 2
-                new KeyValue<>(3, 3.33f), //Partition 2
-                new KeyValue<>(3, 4.44f), //Partition 1
-                new KeyValue<>(3, 5.55f), //Partition 0
-                new KeyValue<>(3, 9.99f), //Partition 2
-                new KeyValue<>(3, 8.88f), //Partition 0
-                new KeyValue<>(3, 0.23f), //Partition 2
-                new KeyValue<>(3, 7.77f), //Partition 0
-                new KeyValue<>(3, 6.66f), //Partition 1
-                new KeyValue<>(3, 1.11f)  //Partition 0 - This will be the final result.
-        );
-
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, table1ForeignKeyChange, LEFT_PROD_CONF, MOCK_TIME);
-        final List<KeyValue<Integer, String>> resultTwo = IntegrationTestUtils.readKeyValues(OUTPUT, CONSUMER_CONFIG, 15 * 1000L, Integer.MAX_VALUE);
-
-        final List<KeyValue<Integer, String>> expectedTwo = new LinkedList<>();
-        expectedTwo.add(new KeyValue<>(3, "value1=1.11,value2=10"));
-        assertArrayEquals(resultTwo.toArray(), expectedTwo.toArray());
-
-        //Ensure the state stores have the correct values within:
-        final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
-        expMatResults.addAll(expected);
-        expMatResults.addAll(expectedTwo);
-        validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
+    public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() {
+        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+            // Deleting a record that never existed doesn't need to emit tombstones.
+            left.pipeInput("lhs1", (String) null);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
+        }
     }
 
     @Test
-    public void doLeftJoinFilterOutRapidlyChangingForeignKeyValues() throws Exception {
-        final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(
-                new KeyValue<>(1, 1.33f),
-                new KeyValue<>(2, 2.22f)
-        );
-
-        //Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
-        final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(
-                new KeyValue<>("0", 0L),  //partition 2
-                new KeyValue<>("1", 10L), //partition 0
-                new KeyValue<>("2", 20L), //partition 2
-                new KeyValue<>("3", 30L), //partition 2
-                new KeyValue<>("4", 40L), //partition 1
-                new KeyValue<>("5", 50L), //partition 0
-                new KeyValue<>("6", 60L), //partition 1
-                new KeyValue<>("7", 70L), //partition 0
-                new KeyValue<>("8", 80L), //partition 0
-                new KeyValue<>("9", 90L)  //partition 2
-        );
-
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
-        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
-
-        final Set<KeyValue<Integer, String>> expected = new HashSet<>();
-        expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));
-        expected.add(new KeyValue<>(2, "value1=2.22,value2=20"));
-
-        final String currentMethodName = new Object() { }
-                .getClass()
-                .getEnclosingMethod()
-                .getName();
-        createAndStartStreamsApplication(currentMethodName, false);
-
-        final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expected.size()));
-
-        assertEquals(result, expected);
-
-        //Rapidly change the foreign key, to validate that the hashing prevents incorrect results from being output,
-        //and that eventually the correct value is output.
-        final List<KeyValue<Integer, Float>> table1ForeignKeyChange = Arrays.asList(
-                new KeyValue<>(3, 2.22f), //Partition 2
-                new KeyValue<>(3, 3.33f), //Partition 2
-                new KeyValue<>(3, 4.44f), //Partition 1
-                new KeyValue<>(3, 5.55f), //Partition 0
-                new KeyValue<>(3, 9.99f), //Partition 2
-                new KeyValue<>(3, 8.88f), //Partition 0
-                new KeyValue<>(3, 0.23f), //Partition 2
-                new KeyValue<>(3, 7.77f), //Partition 0
-                new KeyValue<>(3, 6.66f), //Partition 1
-                new KeyValue<>(3, 1.11f)  //Partition 0 - This will be the final result.
-        );
-
-        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, table1ForeignKeyChange, LEFT_PROD_CONF, MOCK_TIME);
-        final List<KeyValue<Integer, String>> resultTwo = IntegrationTestUtils.readKeyValues(OUTPUT, CONSUMER_CONFIG, 15 * 1000L, Integer.MAX_VALUE);
-
-        final List<KeyValue<Integer, String>> expectedTwo = new LinkedList<>();
-        expectedTwo.add(new KeyValue<>(3, "value1=1.11,value2=10"));
-
-        assertArrayEquals(resultTwo.toArray(), expectedTwo.toArray());
-
-        //Ensure the state stores have the correct values within:
-        final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
-        expMatResults.addAll(expected);
-        expMatResults.addAll(expectedTwo);
-        validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
+    public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey() {
+        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            // no output for a new inner join on a non-existent FK
+            // the left join of course emits the half-joined output
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap())
+            );
+            assertThat(
+                asMap(store),
+                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap())
+            );
+            // "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join,
+            // since it impossible to know whether the prior FK existed or not (and thus whether any results have
+            // previously been emitted)
+            // The left join emits a _necessary_ update (since the lhs record has actually changed)
+            left.pipeInput("lhs1", "lhsValue1|rhs2");
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(mkMap(
+                    mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)
+                ))
+            );
+            assertThat(
+                asMap(store),
+                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
+            );
+            // of course, moving it again to yet another non-existent FK has the same effect
+            left.pipeInput("lhs1", "lhsValue1|rhs3");
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(mkMap(
+                    mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)
+                ))
+            );
+            assertThat(
+                asMap(store),
+                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
+            );
+
+            // Adding an RHS record now, so that we can demonstrate "moving" from a non-existent FK to an existent one
+            // This RHS key was previously referenced, but it's not referenced now, so adding this record should
+            // result in no changes whatsoever.
+            right.pipeInput("rhs1", "rhsValue1");
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(emptyMap())
+            );
+            assertThat(
+                asMap(store),
+                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
+            );
+
+            // now, we change to a FK that exists, and see the join completes
+            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(mkMap(
+                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                ))
+            );
+            assertThat(
+                asMap(store),
+                is(mkMap(
+                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                ))
+            );
+
+            // but if we update it again to a non-existent one, we'll get a tombstone for the inner join, and the
+            // left join updates appropriately.
+            left.pipeInput("lhs1", "lhsValue1|rhs2");
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(mkMap(
+                    mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)
+                ))
+            );
+            assertThat(
+                asMap(store),
+                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
+            );
+        }
     }
 
-    private void createAndStartStreamsApplication(final String queryableStoreName, final boolean leftJoin) {
-        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey-" + queryableStoreName);
-        streams = prepareTopology(queryableStoreName, leftJoin);
-        streamsTwo = prepareTopology(queryableStoreName, leftJoin);
-        streamsThree = prepareTopology(queryableStoreName, leftJoin);
-        streams.start();
-        streamsTwo.start();
-        streamsThree.start();
+    private static Map<String, String> asMap(final KeyValueStore<String, String> store) {
+        final HashMap<String, String> result = new HashMap<>();
+        store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
+        return result;
     }
 
-    // These are hardwired into the test logic for readability sake.
-    // Do not change unless you want to change all the test results as well.
-    private ValueJoiner<Float, Long, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
-    //Do not change. See above comment.
-    private Function<Float, String> tableOneKeyExtractor = value -> Integer.toString((int) value.floatValue());
-
-    private void validateQueryableStoresContainExpectedKeyValues(final Set<KeyValue<Integer, String>> expectedResult,
-                                                                 final String queryableStoreName) {
-        final ReadOnlyKeyValueStore<Integer, String> myJoinStoreOne = streams.store(queryableStoreName,
-                QueryableStoreTypes.keyValueStore());
-
-        final ReadOnlyKeyValueStore<Integer, String> myJoinStoreTwo = streamsTwo.store(queryableStoreName,
-                QueryableStoreTypes.keyValueStore());
-
-        final ReadOnlyKeyValueStore<Integer, String> myJoinStoreThree = streamsThree.store(queryableStoreName,
-                QueryableStoreTypes.keyValueStore());
-
-        // store only keeps last set of values, not entire stream of value changes
-        final Map<Integer, String> expectedInStore = new HashMap<>();
-        for (final KeyValue<Integer, String> expected : expectedResult) {
-            expectedInStore.put(expected.key, expected.value);
-        }
-
-        // depending on partition assignment, the values will be in one of the three stream clients.
-        for (final Map.Entry<Integer, String> expected : expectedInStore.entrySet()) {
-            final String one = myJoinStoreOne.get(expected.getKey());
-            final String two = myJoinStoreTwo.get(expected.getKey());
-            final String three = myJoinStoreThree.get(expected.getKey());
-
-            String result;
-            if (one != null)
-                result = one;
-            else if (two != null)
-                result = two;
-            else if (three != null)
-                result = three;
-            else
-                throw new RuntimeException("Cannot find key " + expected.getKey() + " in any of the state stores");
-            assertEquals(expected.getValue(), result);
-        }
-
-        //Merge all the iterators together to ensure that their sum equals the total set of expected elements.
-        final KeyValueIterator<Integer, String> allOne = myJoinStoreOne.all();
-        final KeyValueIterator<Integer, String> allTwo = myJoinStoreTwo.all();
-        final KeyValueIterator<Integer, String> allThree = myJoinStoreThree.all();
-
-        final List<KeyValue<Integer, String>> all = new LinkedList<>();
-
-        while (allOne.hasNext()) {
-            all.add(allOne.next());
-        }
-        while (allTwo.hasNext()) {
-            all.add(allTwo.next());
-        }
-        while (allThree.hasNext()) {
-            all.add(allThree.next());
-        }
-        allOne.close();
-        allTwo.close();
-        allThree.close();
+    private static Topology getTopology(final Properties streamsConfig,
+                                        final String queryableStoreName,
+                                        final boolean leftJoin) {
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        for (final KeyValue<Integer, String> elem : all) {
-            assertTrue(expectedResult.contains(elem));
-        }
-    }
+        final KTable<String, String> left = builder.table(LEFT_TABLE, Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> right = builder.table(RIGHT_TABLE, Consumed.with(Serdes.String(), Serdes.String()));
 
-    private KafkaStreams prepareTopology(final String queryableStoreName, final boolean leftJoin) {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized =
+            Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName))
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String())
+                .withCachingDisabled();
 
-        final KTable<Integer, Float> left = builder.table(LEFT_TABLE, Consumed.with(Serdes.Integer(), Serdes.Float()));
-        final KTable<String, Long> right = builder.table(RIGHT_TABLE, Consumed.with(Serdes.String(), Serdes.Long()));
-
-        final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized;
-        if (queryableStoreName != null) {
-            materialized = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableStoreName)
-                    .withKeySerde(Serdes.Integer())
-                    .withValueSerde(Serdes.String())
-                    .withCachingDisabled();
-        } else {
-            throw new RuntimeException("Current implementation of join on foreign key requires a materialized store");
-        }
+        final Function<String, String> extractor = value -> value.split("\\|")[1];
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
 
         if (leftJoin)
-            left.leftJoin(right, tableOneKeyExtractor, joiner, Named.as("customName"), materialized)
+            left.leftJoin(right,
+                          extractor,
+                          joiner,
+                          materialized)
                 .toStream()
-                .to(OUTPUT, Produced.with(Serdes.Integer(), Serdes.String()));
+                .to(OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
         else
-            left.join(right, tableOneKeyExtractor, joiner, materialized)
+            left.join(right,
+                      extractor,
+                      joiner,
+                      materialized)
                 .toStream()
-                .to(OUTPUT, Produced.with(Serdes.Integer(), Serdes.String()));
-
-        final Topology topology = builder.build(streamsConfig);
+                .to(OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
 
-        return new KafkaStreams(topology, streamsConfig);
+        return builder.build(streamsConfig);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
new file mode 100644
index 0000000..3ec19de
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.Murmur3;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+
+public class SubscriptionResolverJoinProcessorSupplierTest {
+    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
+    private static final ValueJoiner<String, String, String> JOINER =
+        (value1, value2) -> "(" + value1 + "," + value2 + ")";
+
+    private static class TestKTableValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+        private final Map<K, V> map = new HashMap<>();
+
+        @Override
+        public KTableValueGetter<K, V> get() {
+            return new KTableValueGetter<K, V>() {
+                @Override
+                public void init(final ProcessorContext context) {
+                }
+
+                @Override
+                public ValueAndTimestamp<V> get(final K key) {
+                    return ValueAndTimestamp.make(map.get(key), -1);
+                }
+
+                @Override
+                public void close() {}
+            };
+        }
+
+        @Override
+        public String[] storeNames() {
+            return new String[0];
+        }
+
+        void put(final K key, final V value) {
+            map.put(key, value);
+        }
+    }
+
+    @Test
+    public void shouldNotForwardWhenHashDoesNotMatch() {
+        final TestKTableValueGetterSupplier<String, String> valueGetterSupplier =
+            new TestKTableValueGetterSupplier<>();
+        final boolean leftJoin = false;
+        final SubscriptionResolverJoinProcessorSupplier<String, String, String, String> processorSupplier =
+            new SubscriptionResolverJoinProcessorSupplier<>(
+                valueGetterSupplier,
+                STRING_SERIALIZER,
+                JOINER,
+                leftJoin
+            );
+        final Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
+        final MockProcessorContext context = new MockProcessorContext();
+        processor.init(context);
+        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+
+        valueGetterSupplier.put("lhs1", "lhsValue");
+        final long[] oldHash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "oldLhsValue"));
+        processor.process("lhs1", new SubscriptionResponseWrapper<>(oldHash, "rhsValue"));
+        final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
+        assertThat(forwarded, empty());
+    }
+
+    @Test
+    public void shouldIgnoreUpdateWhenLeftHasBecomeNull() {
+        final TestKTableValueGetterSupplier<String, String> valueGetterSupplier =
+            new TestKTableValueGetterSupplier<>();
+        final boolean leftJoin = false;
+        final SubscriptionResolverJoinProcessorSupplier<String, String, String, String> processorSupplier =
+            new SubscriptionResolverJoinProcessorSupplier<>(
+                valueGetterSupplier,
+                STRING_SERIALIZER,
+                JOINER,
+                leftJoin
+            );
+        final Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
+        final MockProcessorContext context = new MockProcessorContext();
+        processor.init(context);
+        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+
+        valueGetterSupplier.put("lhs1", null);
+        final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
+        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"));
+        final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
+        assertThat(forwarded, empty());
+    }
+
+    @Test
+    public void shouldForwardWhenHashMatches() {
+        final TestKTableValueGetterSupplier<String, String> valueGetterSupplier =
+            new TestKTableValueGetterSupplier<>();
+        final boolean leftJoin = false;
+        final SubscriptionResolverJoinProcessorSupplier<String, String, String, String> processorSupplier =
+            new SubscriptionResolverJoinProcessorSupplier<>(
+                valueGetterSupplier,
+                STRING_SERIALIZER,
+                JOINER,
+                leftJoin
+            );
+        final Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
+        final MockProcessorContext context = new MockProcessorContext();
+        processor.init(context);
+        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+
+        valueGetterSupplier.put("lhs1", "lhsValue");
+        final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
+        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"));
+        final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
+        assertThat(forwarded.size(), is(1));
+        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", "(lhsValue,rhsValue)")));
+    }
+
+    @Test
+    public void shouldEmitTombstoneForInnerJoinWhenRightIsNull() {
+        final TestKTableValueGetterSupplier<String, String> valueGetterSupplier =
+            new TestKTableValueGetterSupplier<>();
+        final boolean leftJoin = false;
+        final SubscriptionResolverJoinProcessorSupplier<String, String, String, String> processorSupplier =
+            new SubscriptionResolverJoinProcessorSupplier<>(
+                valueGetterSupplier,
+                STRING_SERIALIZER,
+                JOINER,
+                leftJoin
+            );
+        final Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
+        final MockProcessorContext context = new MockProcessorContext();
+        processor.init(context);
+        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+
+        valueGetterSupplier.put("lhs1", "lhsValue");
+        final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
+        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, null));
+        final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
+        assertThat(forwarded.size(), is(1));
+        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", null)));
+    }
+
+    @Test
+    public void shouldEmitResultForLeftJoinWhenRightIsNull() {
+        final TestKTableValueGetterSupplier<String, String> valueGetterSupplier =
+            new TestKTableValueGetterSupplier<>();
+        final boolean leftJoin = true;
+        final SubscriptionResolverJoinProcessorSupplier<String, String, String, String> processorSupplier =
+            new SubscriptionResolverJoinProcessorSupplier<>(
+                valueGetterSupplier,
+                STRING_SERIALIZER,
+                JOINER,
+                leftJoin
+            );
+        final Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
+        final MockProcessorContext context = new MockProcessorContext();
+        processor.init(context);
+        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+
+        valueGetterSupplier.put("lhs1", "lhsValue");
+        final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
+        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, null));
+        final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
+        assertThat(forwarded.size(), is(1));
+        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", "(lhsValue,null)")));
+    }
+
+    @Test
+    public void shouldEmitTombstoneForLeftJoinWhenRightIsNullAndLeftIsNull() {
+        final TestKTableValueGetterSupplier<String, String> valueGetterSupplier =
+            new TestKTableValueGetterSupplier<>();
+        final boolean leftJoin = true;
+        final SubscriptionResolverJoinProcessorSupplier<String, String, String, String> processorSupplier =
+            new SubscriptionResolverJoinProcessorSupplier<>(
+                valueGetterSupplier,
+                STRING_SERIALIZER,
+                JOINER,
+                leftJoin
+            );
+        final Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
+        final MockProcessorContext context = new MockProcessorContext();
+        processor.init(context);
+        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+
+        valueGetterSupplier.put("lhs1", null);
+        final long[] hash = null;
+        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, null));
+        final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
+        assertThat(forwarded.size(), is(1));
+        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", null)));
+    }
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 947e65e..7fb8b68 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -161,6 +161,15 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         public KeyValue keyValue() {
             return keyValue;
         }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "childName='" + childName + '\'' +
+                ", timestamp=" + timestamp +
+                ", keyValue=" + keyValue +
+                '}';
+        }
     }
 
     // constructors ================================================


Mime
View raw message