kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9058: Lift queriable and materialized restrictions on FK Join (#7541)
Date Thu, 17 Oct 2019 18:53:43 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 0956966  KAFKA-9058: Lift queriable and materialized restrictions on FK Join (#7541)
0956966 is described below

commit 09569668ae2ae3e86f2e6de7819f9496fc12a1f9
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Thu Oct 17 13:42:10 2019 -0500

    KAFKA-9058: Lift queriable and materialized restrictions on FK Join (#7541)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/kstream/KTable.java   |  90 +++++++++-
 .../streams/kstream/internals/KTableImpl.java      | 123 ++++++++++---
 .../streams/integration/ForeignKeyJoinSuite.java   |   1 +
 .../KTableKTableForeignKeyJoinIntegrationTest.java |  58 +++----
 ...reignKeyJoinMaterializationIntegrationTest.java | 192 +++++++++++++++++++++
 5 files changed, 403 insertions(+), 61 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 5092d35..f9b38bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -2128,18 +2128,54 @@ public interface KTable<K, V> {
      * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
      *                            result is null, the update is ignored as invalid.
      * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table with {@code
other}
+     */
+    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                    final Function<V, KO> foreignKeyExtractor,
+                                    final ValueJoiner<V, VO, VR> joiner);
+
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed
inner join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
      * @param named               a {@link Named} config used to name the processor in the
topology
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table with {@code
other}
+     */
+    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                    final Function<V, KO> foreignKeyExtractor,
+                                    final ValueJoiner<V, VO, VR> joiner,
+                                    final Named named);
+
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed
inner join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
      * @param materialized        a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
      *                            should be materialized. Cannot be {@code null}
      * @param <VR>                the value type of the result {@code KTable}
      * @param <KO>                the key type of the other {@code KTable}
      * @param <VO>                the value type of the other {@code KTable}
-     * @return
+     * @return a {@code KTable} that contains the result of joining this table with {@code
other}
      */
     <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                     final Function<V, KO> foreignKeyExtractor,
                                     final ValueJoiner<V, VO, VR> joiner,
-                                    final Named named,
                                     final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
 
     /**
@@ -2151,16 +2187,18 @@ public interface KTable<K, V> {
      * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
      *                            result is null, the update is ignored as invalid.
      * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
+     * @param named               a {@link Named} config used to name the processor in the
topology
      * @param materialized        a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
      *                            should be materialized. Cannot be {@code null}
      * @param <VR>                the value type of the result {@code KTable}
      * @param <KO>                the key type of the other {@code KTable}
      * @param <VO>                the value type of the other {@code KTable}
-     * @return
+     * @return a {@code KTable} that contains the result of joining this table with {@code
other}
      */
     <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                     final Function<V, KO> foreignKeyExtractor,
                                     final ValueJoiner<V, VO, VR> joiner,
+                                    final Named named,
                                     final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
 
     /**
@@ -2169,21 +2207,57 @@ public interface KTable<K, V> {
      * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
      *
      * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains only those records that satisfy the given predicate
+     */
+    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                        final Function<V, KO> foreignKeyExtractor,
+                                        final ValueJoiner<V, VO, VR> joiner);
+
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed
left join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
      * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V) If the
      *                            result is null, the update is ignored as invalid.
      * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
      * @param named               a {@link Named} config used to name the processor in the
topology
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table with {@code
other}
+     */
+    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                        final Function<V, KO> foreignKeyExtractor,
+                                        final ValueJoiner<V, VO, VR> joiner,
+                                        final Named named);
+
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed
left join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
      * @param materialized        a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
      *                            should be materialized. Cannot be {@code null}
      * @param <VR>                the value type of the result {@code KTable}
      * @param <KO>                the key type of the other {@code KTable}
      * @param <VO>                the value type of the other {@code KTable}
-     * @return a {@code KTable} that contains only those records that satisfy the given predicate
+     * @return a {@code KTable} that contains the result of joining this table with {@code
other}
      */
     <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                         final Function<V, KO> foreignKeyExtractor,
                                         final ValueJoiner<V, VO, VR> joiner,
-                                        final Named named,
                                         final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
 
     /**
@@ -2192,19 +2266,21 @@ public interface KTable<K, V> {
      * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
      *
      * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
-     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V) If the
      *                            result is null, the update is ignored as invalid.
      * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
+     * @param named               a {@link Named} config used to name the processor in the
topology
      * @param materialized        a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
      *                            should be materialized. Cannot be {@code null}
      * @param <VR>                the value type of the result {@code KTable}
      * @param <KO>                the key type of the other {@code KTable}
      * @param <VO>                the value type of the other {@code KTable}
-     * @return a {@code KTable} that contains only those records that satisfy the given predicate
+     * @return a {@code KTable} that contains the result of joining this table with {@code
other}
      */
     <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                         final Function<V, KO> foreignKeyExtractor,
                                         final ValueJoiner<V, VO, VR> joiner,
+                                        final Named named,
                                         final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 05e04e8..301710d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -110,12 +110,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
 
     private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
 
-    private static final String FK_JOIN_STATE_STORE_NAME = "KTABLE-INTERNAL-SUBSCRIPTION-STATE-STORE-";
-    private static final String SUBSCRIPTION_REGISTRATION = "KTABLE-SUBSCRIPTION-REGISTRATION-";
-    private static final String SUBSCRIPTION_RESPONSE = "KTABLE-SUBSCRIPTION-RESPONSE-";
-    private static final String SUBSCRIPTION_PROCESSOR = "KTABLE-SUBSCRIPTION-PROCESSOR-";
-    private static final String SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR = "KTABLE-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-";
-    private static final String FK_JOIN_OUTPUT_PROCESSOR = "KTABLE-OUTPUT-PROCESSOR-";
+    private static final String FK_JOIN = "KTABLE-FK-JOIN-";
+    private static final String FK_JOIN_STATE_STORE_NAME = FK_JOIN + "SUBSCRIPTION-STATE-STORE-";
+    private static final String SUBSCRIPTION_REGISTRATION = FK_JOIN + "SUBSCRIPTION-REGISTRATION-";
+    private static final String SUBSCRIPTION_RESPONSE = FK_JOIN + "SUBSCRIPTION-RESPONSE-";
+    private static final String SUBSCRIPTION_PROCESSOR = FK_JOIN + "SUBSCRIPTION-PROCESSOR-";
+    private static final String SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR = FK_JOIN + "SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-";
+    private static final String FK_JOIN_OUTPUT_NAME = FK_JOIN + "OUTPUT-";
+
     private static final String TOPIC_SUFFIX = "-topic";
     private static final String SINK_NAME = "KTABLE-SINK-";
 
@@ -836,20 +838,76 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
     @Override
     public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                            final Function<V, KO> foreignKeyExtractor,
+                                           final ValueJoiner<V, VO, VR> joiner) {
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            NamedInternal.empty(),
+            Materialized.with(null, null),
+            false
+        );
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                           final Function<V, KO> foreignKeyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
-                                           final Named named,
-                                           final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
+                                           final Named named) {
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            named,
+            Materialized.with(null, null),
+            false
+        );
+    }
 
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, new MaterializedInternal<>(materialized),
false);
+    @Override
+    public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                           final Function<V, KO> foreignKeyExtractor,
+                                           final ValueJoiner<V, VO, VR> joiner,
+                                           final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
+        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(),
materialized, false);
     }
 
     @Override
     public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                            final Function<V, KO> foreignKeyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
+                                           final Named named,
                                            final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
+        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, materialized,
false);
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                               final Function<V, KO> foreignKeyExtractor,
+                                               final ValueJoiner<V, VO, VR> joiner)
{
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            NamedInternal.empty(),
+            Materialized.with(null, null),
+            true
+        );
+    }
 
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(),
new MaterializedInternal<>(materialized), false);
+    @Override
+    public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                               final Function<V, KO> foreignKeyExtractor,
+                                               final ValueJoiner<V, VO, VR> joiner,
+                                               final Named named) {
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            named,
+            Materialized.with(null, null),
+            true
+        );
     }
 
     @Override
@@ -858,7 +916,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V>
implements KTable<
                                                final ValueJoiner<V, VO, VR> joiner,
                                                final Named named,
                                                final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, new MaterializedInternal<>(materialized),
true);
+        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, materialized,
true);
     }
 
     @Override
@@ -866,23 +924,21 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
                                                final Function<V, KO> foreignKeyExtractor,
                                                final ValueJoiner<V, VO, VR> joiner,
                                                final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
-
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(),
new MaterializedInternal<>(materialized), true);
+        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(),
materialized, true);
     }
 
-
     @SuppressWarnings("unchecked")
     private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO,
VO> foreignKeyTable,
                                                           final Function<V, KO> foreignKeyExtractor,
                                                           final ValueJoiner<V, VO, VR>
joiner,
                                                           final Named joinName,
-                                                          final MaterializedInternal<K,
VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
+                                                          final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized,
                                                           final boolean leftJoin) {
         Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
         Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joinName, "joinName can't be null");
-        Objects.requireNonNull(materializedInternal, "materialized can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
 
         //Old values are a useful optimization. The old values from the foreignKeyTable table
are compared to the new values,
         //such that identical values do not cause a prefixScan. PrefixScan and propagation
can be expensive and should
@@ -1011,14 +1067,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
         builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
 
         final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
+        final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier
= new SubscriptionResolverJoinProcessorSupplier<>(
+            primaryKeyValueGetter,
+            valueSerde().serializer(),
+            joiner,
+            leftJoin
+        );
         final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> resolverNode
= new StatefulProcessorNode<>(
             new ProcessorParameters<>(
-                new SubscriptionResolverJoinProcessorSupplier<>(
-                    primaryKeyValueGetter,
-                    valueSerde().serializer(),
-                    joiner,
-                    leftJoin
-                ),
+                resolverProcessorSupplier,
                 renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
             ),
             Collections.emptySet(),
@@ -1026,8 +1083,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
         );
         builder.addGraphNode(foreignResponseSource, resolverNode);
 
-        final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder,
FK_JOIN_OUTPUT_PROCESSOR);
-        final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal.storeName(),
materializedInternal.queryableStoreName());
+        final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder,
FK_JOIN_OUTPUT_NAME);
+
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
=
+            new MaterializedInternal<>(
+                materialized,
+                builder,
+                FK_JOIN_OUTPUT_NAME
+            );
+
+        // If we have a key serde, it's still valid, but we don't know the value serde, since
it's the result
+        // of the joiner (VR).
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+
+        final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(
+            materializedInternal.storeName(),
+            materializedInternal.queryableStoreName()
+        );
+
         final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
             materializedInternal.queryableStoreName() == null
                 ? null
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
index ac6adb8..47e2d95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
@@ -38,6 +38,7 @@ import org.junit.runners.Suite;
     BytesTest.class,
     KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class,
     KTableKTableForeignKeyJoinIntegrationTest.class,
+    KTableKTableForeignKeyJoinMaterializationIntegrationTest.class,
     CombinedKeySchemaTest.class,
     SubscriptionWrapperSerdeTest.class,
     SubscriptionResponseWrapperSerdeTest.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 14a39b5..80c0f52 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
@@ -30,7 +29,6 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -54,7 +52,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 
-@RunWith(value = Parameterized.class)
+@RunWith(Parameterized.class)
 public class KTableKTableForeignKeyJoinIntegrationTest {
 
     private static final String LEFT_TABLE = "left_table";
@@ -265,10 +263,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
     }
 
     @Test
-    public void shouldEmitTombstonedWhenDeletingNonJoiningRecords() {
+    public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
         final Topology topology = getTopology(streamsConfig, "store", leftJoin);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig))
{
-            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE,
new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE,
new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT,
new StringDeserializer(), new StringDeserializer());
             final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
@@ -295,7 +292,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
-                    is(Utils.<String, String>mkMap(mkEntry("lhs1", null)))
+                    is(mkMap(mkEntry("lhs1", null)))
                 );
                 assertThat(
                     asMap(store),
@@ -322,7 +319,6 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
     public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() {
         final Topology topology = getTopology(streamsConfig, "store", leftJoin);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig))
{
-            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE,
new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE,
new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT,
new StringDeserializer(), new StringDeserializer());
             final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
@@ -369,9 +365,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs1", "lhsValue1|rhs2");
             assertThat(
                 outputTopic.readKeyValuesToMap(),
-                is(mkMap(
-                    mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)
-                ))
+                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)))
             );
             assertThat(
                 asMap(store),
@@ -381,9 +375,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs1", "lhsValue1|rhs3");
             assertThat(
                 outputTopic.readKeyValuesToMap(),
-                is(mkMap(
-                    mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)
-                ))
+                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)))
             );
             assertThat(
                 asMap(store),
@@ -448,29 +440,35 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         final KTable<String, String> left = builder.table(LEFT_TABLE, Consumed.with(Serdes.String(),
Serdes.String()));
         final KTable<String, String> right = builder.table(RIGHT_TABLE, Consumed.with(Serdes.String(),
Serdes.String()));
 
+        final Function<String, String> extractor = value -> value.split("\\|")[1];
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> "("
+ value1 + "," + value2 + ")";
+
         final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized
=
             Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName))
-                .withKeySerde(Serdes.String())
                 .withValueSerde(Serdes.String())
+                // the cache suppresses some of the unnecessary tombstones we want to make
assertions about
                 .withCachingDisabled();
 
-        final Function<String, String> extractor = value -> value.split("\\|")[1];
-        final ValueJoiner<String, String, String> joiner = (value1, value2) -> "("
+ value1 + "," + value2 + ")";
+        final KTable<String, String> joinResult;
+        if (leftJoin) {
+            joinResult = left.leftJoin(
+                right,
+                extractor,
+                joiner,
+                materialized
+            );
+        } else {
+            joinResult = left.join(
+                right,
+                extractor,
+                joiner,
+                materialized
+            );
+        }
 
-        if (leftJoin)
-            left.leftJoin(right,
-                          extractor,
-                          joiner,
-                          materialized)
-                .toStream()
-                .to(OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
-        else
-            left.join(right,
-                      extractor,
-                      joiner,
-                      materialized)
-                .toStream()
-                .to(OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
+        joinResult
+            .toStream()
+            .to(OUTPUT);
 
         return builder.build(streamsConfig);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
new file mode 100644
index 0000000..6abedb2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+@RunWith(Parameterized.class)
+public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
+
+    private static final String LEFT_TABLE = "left_table";
+    private static final String RIGHT_TABLE = "right_table";
+    private static final String OUTPUT = "output-topic";
+    private final Properties streamsConfig;
+    private final boolean materialized;
+    private final boolean queriable;
+
+    public KTableKTableForeignKeyJoinMaterializationIntegrationTest(final boolean materialized,
final boolean queriable) {
+        this.materialized = materialized;
+        this.queriable = queriable;
+        streamsConfig = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+        ));
+    }
+
+    @Parameterized.Parameters(name = "materialized={0}, queriable={1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+            new Object[] {false, false},
+            new Object[] {true, false},
+            new Object[] {true, true}
+        );
+    }
+
+    @Test
+    public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
+        final Topology topology = getTopology(streamsConfig, "store");
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig))
{
+            final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE,
new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT,
new StringDeserializer(), new StringDeserializer());
+            final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1");
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(emptyMap())
+            );
+            if (materialized && queriable) {
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
+
+            // Deleting a non-joining record produces an unnecessary tombstone for inner
joins, because
+            // it's not possible to know whether a result was previously emitted.
+            left.pipeInput("lhs1", (String) null);
+            {
+                if (materialized && queriable) {
+                    // in only this specific case, the record cache will actually be activated
and
+                    // suppress the unnecessary tombstone. This is because the cache is able
to determine
+                    // for sure that there has never been a previous result. (Because the
"old" and "new" values
+                    // are both null, and the underlying store is also missing the record
in question).
+                    assertThat(
+                        outputTopic.readKeyValuesToMap(),
+                        is(emptyMap())
+                    );
+
+                    assertThat(
+                        asMap(store),
+                        is(emptyMap())
+                    );
+                } else {
+                    assertThat(
+                        outputTopic.readKeyValuesToMap(),
+                        is(mkMap(mkEntry("lhs1", null)))
+                    );
+                }
+            }
+
+            // Deleting a non-existing record is idempotent
+            left.pipeInput("lhs1", (String) null);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+                if (materialized && queriable) {
+                    assertThat(
+                        asMap(store),
+                        is(emptyMap())
+                    );
+                }
+            }
+        }
+    }
+
+    private static Map<String, String> asMap(final KeyValueStore<String, String>
store) {
+        final HashMap<String, String> result = new HashMap<>();
+        store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
+        return result;
+    }
+
+    private Topology getTopology(final Properties streamsConfig,
+                                 final String queryableStoreName) {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> left = builder.table(LEFT_TABLE, Consumed.with(Serdes.String(),
Serdes.String()));
+        final KTable<String, String> right = builder.table(RIGHT_TABLE, Consumed.with(Serdes.String(),
Serdes.String()));
+
+        final Function<String, String> extractor = value -> value.split("\\|")[1];
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> "("
+ value1 + "," + value2 + ")";
+
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized;
+        if (queriable) {
+            materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableStoreName).withValueSerde(Serdes.String());
+        } else {
+            materialized = Materialized.with(null, Serdes.String());
+        }
+
+        final KTable<String, String> joinResult;
+        if (this.materialized) {
+            joinResult = left.join(
+                right,
+                extractor,
+                joiner,
+                materialized
+            );
+        } else {
+            joinResult = left.join(
+                right,
+                extractor,
+                joiner
+            );
+        }
+
+        joinResult
+            .toStream()
+            .to(OUTPUT);
+
+        return builder.build(streamsConfig);
+    }
+}


Mime
View raw message