kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.4 updated: MINOR: Improve FK Join docs and optimize null-fk case (#7536)
Date Thu, 17 Oct 2019 04:08:43 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 40b8433  MINOR: Improve FK Join docs and optimize null-fk case (#7536)
40b8433 is described below

commit 40b843310de1b18943387fa4c548c498f2def0c5
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Oct 16 22:55:20 2019 -0500

    MINOR: Improve FK Join docs and optimize null-fk case (#7536)
    
    Fix the formatting and wording of the foreign-key join javadoc
    Optimize handling of null extracted foreign keys
    
    Reviewers: Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
---
 .../org/apache/kafka/streams/kstream/KTable.java   | 96 +++++++++++-----------
 ...reignJoinSubscriptionSendProcessorSupplier.java | 49 +++++++++--
 2 files changed, 91 insertions(+), 54 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 21e42df..5092d35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -2120,19 +2120,20 @@ public interface KTable<K, V> {
                                      final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
 
     /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed
inner join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
      *
-     * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed
inner join. Records from this
-     * table are joined according to the result of keyExtractor on the other KTable.
-     *
-     * @param other  the other {@code KTable} to be joined with this {@code KTable}. Keyed
by KO.
-     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V)
-     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching
records
-     * @param named     a {@link Named} config used to name the processor in the topology
-     * @param materialized  a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
-     *                      should be materialized. Cannot be {@code null}
-     * @param <VR> the value type of the result {@code KTable}
-     * @param <KO> the key type of the other {@code KTable}
-     * @param <VO> the value type of the other {@code KTable}
+     * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
+     * @param named               a {@link Named} config used to name the processor in the
topology
+     * @param materialized        a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
      * @return
      */
     <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
@@ -2142,18 +2143,19 @@ public interface KTable<K, V> {
                                     final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
 
     /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed
inner join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
      *
-     * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed
inner join. Records from this
-     * table are joined according to the result of keyExtractor on the other KTable.
-     *
-     * @param other  the other {@code KTable} to be joined with this {@code KTable}. Keyed
by KO.
-     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V)
-     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching
records
-     * @param materialized  a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
-     *                      should be materialized. Cannot be {@code null}
-     * @param <VR> the value type of the result {@code KTable}
-     * @param <KO> the key type of the other {@code KTable}
-     * @param <VO> the value type of the other {@code KTable}
+     * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
+     * @param materialized        a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
      * @return
      */
     <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
@@ -2162,20 +2164,20 @@ public interface KTable<K, V> {
                                     final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
 
     /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed
left join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
      *
-     * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed
left join. Records from this
-     * table are joined according to the result of keyExtractor on the other KTable.
-     *
-     * @param other  the other {@code KTable} to be joined with this {@code KTable}. Keyed
by KO.
-     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
-     *      *                            resultant foreignKey is null, the record will not
propagate to the output.
-     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching
records
-     * @param named     a {@link Named} config used to name the processor in the topology
-     * @param materialized  a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
-     *                      should be materialized. Cannot be {@code null}
-     * @param <VR> the value type of the result {@code KTable}
-     * @param <KO> the key type of the other {@code KTable}
-     * @param <VO> the value type of the other {@code KTable}
+     * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V) If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
+     * @param named               a {@link Named} config used to name the processor in the
topology
+     * @param materialized        a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
      * @return a {@code KTable} that contains only those records that satisfy the given predicate
      */
     <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
@@ -2185,19 +2187,19 @@ public interface KTable<K, V> {
                                         final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
 
     /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed
left join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
      *
-     * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed
left join. Records from this
-     * table are joined according to the result of keyExtractor on the other KTable.
-     *
-     * @param other  the other {@code KTable} to be joined with this {@code KTable}. Keyed
by KO.
+     * @param other               the other {@code KTable} to be joined with this {@code
KTable}. Keyed by KO.
      * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this
table's value (V). If the
-     *                            resultant foreignKey is null, the record will not propagate
to the output.
-     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching
records
-     * @param materialized  a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
-     *                      should be materialized. Cannot be {@code null}
-     * @param <VR> the value type of the result {@code KTable}
-     * @param <KO> the key type of the other {@code KTable}
-     * @param <VO> the value type of the other {@code KTable}
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for
a pair of matching records
+     * @param materialized        a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
      * @return a {@code KTable} that contains only those records that satisfy the given predicate
      */
     <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
index f122258..806cf5c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.Change;
@@ -24,17 +25,22 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.Murmur3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.function.Function;
 import java.util.Arrays;
+import java.util.function.Function;
 
-import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
-import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
 import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE;
 import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE;
+import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
+import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
 
 public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K,
Change<V>> {
+    private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
 
     private final Function<V, KO> foreignKeyExtractor;
     private final String repartitionTopicName;
@@ -60,7 +66,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V>
implements P
     }
 
     private class UnbindChangeProcessor extends AbstractProcessor<K, Change<V>>
{
-        
+
+        private Sensor skippedRecordsSensor;
+
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
@@ -69,18 +77,36 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V>
implements P
             if (foreignKeySerializer == null) {
                 foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
             }
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(),
+                                                                  (StreamsMetricsImpl) context.metrics());
         }
 
         @Override
         public void process(final K key, final Change<V> change) {
             final long[] currentHash = change.newValue == null ?
-                    null :
-                    Murmur3.hash128(valueSerializer.serialize(repartitionTopicName, change.newValue));
+                null :
+                Murmur3.hash128(valueSerializer.serialize(repartitionTopicName, change.newValue));
 
             if (change.oldValue != null) {
                 final KO oldForeignKey = foreignKeyExtractor.apply(change.oldValue);
+                if (oldForeignKey == null) {
+                    LOG.warn(
+                        "Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}]
offset=[{}]",
+                        change.oldValue, context().topic(), context().partition(), context().offset()
+                    );
+                    skippedRecordsSensor.record();
+                    return;
+                }
                 if (change.newValue != null) {
                     final KO newForeignKey = foreignKeyExtractor.apply(change.newValue);
+                    if (newForeignKey == null) {
+                        LOG.warn(
+                            "Skipping record due to null foreign key. value=[{}] topic=[{}]
partition=[{}] offset=[{}]",
+                            change.newValue, context().topic(), context().partition(), context().offset()
+                        );
+                        skippedRecordsSensor.record();
+                        return;
+                    }
 
                     final byte[] serialOldForeignKey = foreignKeySerializer.serialize(repartitionTopicName,
oldForeignKey);
                     final byte[] serialNewForeignKey = foreignKeySerializer.serialize(repartitionTopicName,
newForeignKey);
@@ -109,7 +135,16 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V>
implements P
                 } else {
                     instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
                 }
-                context().forward(foreignKeyExtractor.apply(change.newValue), new SubscriptionWrapper<>(currentHash,
instruction, key));
+                final KO newForeignKey = foreignKeyExtractor.apply(change.newValue);
+                if (newForeignKey == null) {
+                    LOG.warn(
+                        "Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}]
offset=[{}]",
+                        change.newValue, context().topic(), context().partition(), context().offset()
+                    );
+                    skippedRecordsSensor.record();
+                } else {
+                    context().forward(newForeignKey, new SubscriptionWrapper<>(currentHash,
instruction, key));
+                }
             }
         }
     }


Mime
View raw message