kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA:5653: add join overloads to KTable
Date Tue, 12 Sep 2017 15:01:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e1491d4a0 -> 08063f50a


KAFKA:5653: add join overloads to KTable

Add `join`, `leftJoin`, `outerJoin` overloads that use `Materialized` to `KTable`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3826 from dguy/kafka-5653


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/08063f50
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/08063f50
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/08063f50

Branch: refs/heads/trunk
Commit: 08063f50a04fda3e40c6060a432a97f49bb68c8c
Parents: e1491d4
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Sep 12 16:01:19 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Tue Sep 12 16:01:19 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KTable.java    | 290 +++++++++++++++++--
 .../streams/kstream/internals/KTableImpl.java   |  88 +++++-
 .../KTableKTableJoinIntegrationTest.java        |  33 ++-
 .../kstream/internals/KTableImplTest.java       |  24 +-
 4 files changed, 400 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 2571ac1..6d1d85d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -84,7 +84,7 @@ public interface KTable<K, V> {
      * have delete semantics.
      * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone
record is forwarded
      * directly if required (i.e., if there is anything to be deleted).
-     * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given
predicate) a tombstone record
+     * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate)
a tombstone record
      * is forwarded.
      *
      * @param predicate a filter {@link Predicate} that is applied to each record
@@ -106,7 +106,7 @@ public interface KTable<K, V> {
      * have delete semantics.
      * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone
record is forwarded
      * directly if required (i.e., if there is anything to be deleted).
-     * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given
predicate) a tombstone record
+     * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate)
a tombstone record
      * is forwarded.
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
@@ -124,7 +124,7 @@ public interface KTable<K, V> {
      *
      * @param predicate     a filter {@link Predicate} that is applied to each record
      * @param materialized  a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
-     *                      should be materialized
+     *                      should be materialized. Cannot be {@code null}
      * @return a {@code KTable} that contains only those records that satisfy the given predicate
      * @see #filterNot(Predicate, Materialized)
      */
@@ -144,7 +144,7 @@ public interface KTable<K, V> {
      * have delete semantics.
      * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone
record is forwarded
      * directly if required (i.e., if there is anything to be deleted).
-     * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given
predicate) a tombstone record
+     * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate)
a tombstone record
      * is forwarded.
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
@@ -184,7 +184,7 @@ public interface KTable<K, V> {
      * have delete semantics.
      * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone
record is forwarded
      * directly if required (i.e., if there is anything to be deleted).
-     * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given
predicate) a tombstone record
+     * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate)
a tombstone record
      * is forwarded.
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
@@ -260,7 +260,7 @@ public interface KTable<K, V> {
      * <p>
      * @param predicate a filter {@link Predicate} that is applied to each record
      * @param materialized  a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
-     *                      should be materialized
+     *                      should be materialized. Cannot be {@code null}
      * @return a {@code KTable} that contains only those records that do <em>not</em>
satisfy the given predicate
      * @see #filter(Predicate, Materialized)
      */
@@ -412,7 +412,7 @@ public interface KTable<K, V> {
      *
      * @param mapper a {@link ValueMapper} that computes a new output value
      * @param materialized  a {@link Materialized} that describes how the {@link StateStore}
for the resulting {@code KTable}
-     *                      should be materialized
+     *                      should be materialized. Cannot be {@code null}
      * @param <VR>   the value type of the result {@code KTable}
      *
      * @return a {@code KTable} that contains records with unmodified keys and new values
(possibly of different type)
@@ -1342,6 +1342,82 @@ public interface KTable<K, V> {
      * Both input streams (or to be more precise, their underlying source topics) need to
have the same number of
      * partitions.
      *
+     * @param other         the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner        a {@link ValueJoiner} that computes the join result for a pair
of matching records
+     * @param materialized  an instance of {@link Materialized} used to describe how the
state store should be materialized.
+     *                      Cannot be {@code null}
+     * @param <VO>          the value type of the other {@code KTable}
+     * @param <VR>          the value type of the result {@code KTable}
+     * @return a {@code KTable} that contains join-records for each key and values computed
by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key
+     * @see #leftJoin(KTable, ValueJoiner, Materialized)
+     * @see #outerJoin(KTable, ValueJoiner, Materialized)
+     */
+    <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
+                                final ValueJoiner<? super V, ? super VO, ? extends VR>
joiner,
+                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>>
materialized);
+    /**
+     * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed
inner equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * The result is an ever updating {@code KTable} that represents the <em>current</em>
(i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and
(2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal
state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the
{@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code
KTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the
result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records)
have delete semantics.
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone
record is forwarded
+     * directly to delete a record in the result {@code KTable} if required (i.e., if there
is anything to be deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td></td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:C&gt;</td>
+     * <td>&lt;K1:C&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:C&gt;</td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to
have the same number of
+     * partitions.
+     *
      * @param other  the other {@code KTable} to be joined with this {@code KTable}
      * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching
records
      * @param <VO>   the value type of the other {@code KTable}
@@ -1353,9 +1429,11 @@ public interface KTable<K, V> {
      * (i.e., that would be equivalent to calling {@link KTable#join(KTable, ValueJoiner)}.
      * @return a {@code KTable} that contains join-records for each key and values computed
by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key
-     * @see #leftJoin(KTable, ValueJoiner)
-     * @see #outerJoin(KTable, ValueJoiner)
+     * @see #leftJoin(KTable, ValueJoiner, Materialized)
+     * @see #outerJoin(KTable, ValueJoiner, Materialized)
+     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)}
      */
+    @Deprecated
     <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR>
joiner,
                                 final Serde<VR> joinSerde,
@@ -1430,9 +1508,11 @@ public interface KTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains join-records for each key and values computed
by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key
-     * @see #leftJoin(KTable, ValueJoiner)
-     * @see #outerJoin(KTable, ValueJoiner)
+     * @see #leftJoin(KTable, ValueJoiner, Materialized)
+     * @see #outerJoin(KTable, ValueJoiner, Materialized)
+     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)}
      */
+    @Deprecated
     <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR>
joiner,
                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -1589,6 +1669,90 @@ public interface KTable<K, V> {
      * Both input streams (or to be more precise, their underlying source topics) need to
have the same number of
      * partitions.
      *
+     * @param other         the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner        a {@link ValueJoiner} that computes the join result for a pair
of matching records
+     * @param materialized  an instance of {@link Materialized} used to describe how the
state store should be materialized.
+     *                      Cannot be {@code null}
+     * @param <VO>          the value type of the other {@code KTable}
+     * @param <VR>          the value type of the result {@code KTable}
+     * @return a {@code KTable} that contains join-records for each key and values computed
by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for
each non-matching record of
+     * left {@code KTable}
+     * @see #join(KTable, ValueJoiner, Materialized)
+     * @see #outerJoin(KTable, ValueJoiner, Materialized)
+     */
+    <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+                                    final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
+                                    final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
+    /**
+     * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right
input) records using
+     * non-windowed left equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left
{@code KTable} will produce
+     * an output record (cf. below).
+     * The result is an ever updating {@code KTable} that represents the <em>current</em>
(i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and
(2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal
state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the
{@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code
KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type)
for the result record.
+     * Additionally, for each record of left {@code KTable} that does not find a corresponding
record in the
+     * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with
{@code rightValue =
+     * null} to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records)
have delete semantics.
+     * For example, for left input tombstones the provided value-joiner is not called but
a tombstone record is
+     * forwarded directly to delete a record in the result {@code KTable} if required (i.e.,
if there is anything to be
+     * deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to
have the same number of
+     * partitions.
+     *
      * @param other  the other {@code KTable} to be joined with this {@code KTable}
      * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching
records
      * @param <VO>   the value type of the other {@code KTable}
@@ -1601,9 +1765,11 @@ public interface KTable<K, V> {
      * @return a {@code KTable} that contains join-records for each key and values computed
by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for
each non-matching record of
      * left {@code KTable}
-     * @see #join(KTable, ValueJoiner)
-     * @see #outerJoin(KTable, ValueJoiner)
+     * @see #join(KTable, ValueJoiner, Materialized)
+     * @see #outerJoin(KTable, ValueJoiner, Materialized)
+     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)}
      */
+    @Deprecated
     <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                     final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
                                     final Serde<VR> joinSerde,
@@ -1686,9 +1852,11 @@ public interface KTable<K, V> {
      * @return a {@code KTable} that contains join-records for each key and values computed
by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for
each non-matching record of
      * left {@code KTable}
-     * @see #join(KTable, ValueJoiner)
-     * @see #outerJoin(KTable, ValueJoiner)
+     * @see #join(KTable, ValueJoiner, Materialized)
+     * @see #outerJoin(KTable, ValueJoiner, Materialized)
+     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)}
      */
+    @Deprecated
     <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                     final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
                                     final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -1843,6 +2011,90 @@ public interface KTable<K, V> {
      * Both input streams (or to be more precise, their underlying source topics) need to
have the same number of
      * partitions.
      *
+     * @param other         the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner        a {@link ValueJoiner} that computes the join result for a pair
of matching records
+     * @param materialized  an instance of {@link Materialized} used to describe how the
state store should be materialized.
+     *                      Cannot be {@code null}
+     * @param <VO>          the value type of the other {@code KTable}
+     * @param <VR>          the value type of the result {@code KTable}
+     * @return a {@code KTable} that contains join-records for each key and values computed
by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for
each non-matching record of
+     * both {@code KTable}s
+     * @see #join(KTable, ValueJoiner)
+     * @see #leftJoin(KTable, ValueJoiner)
+     */
+    <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+                                     final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
+                                     final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);
+
+    /**
+     * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right
input) records using
+     * non-windowed outer equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable,
ValueJoiner) left-join},
+     * all records from both input {@code KTable}s will produce an output record (cf. below).
+     * The result is an ever updating {@code KTable} that represents the <em>current</em>
(i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and
(2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal
state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the
{@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code
KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type)
for the result record.
+     * Additionally, for each record that does not find a corresponding record in the corresponding
other
+     * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code
null} value for the
+     * corresponding other value to compute a value (with arbitrary type) for the result
record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records)
have delete semantics.
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone
record is forwarded directly
+     * to delete a record in the result {@code KTable} if required (i.e., if there is anything
to be deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(null,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to
have the same number of
+     * partitions.
+     *
      * @param other  the other {@code KTable} to be joined with this {@code KTable}
      * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching
records
      * @param <VO>   the value type of the other {@code KTable}
@@ -1855,9 +2107,11 @@ public interface KTable<K, V> {
      * @return a {@code KTable} that contains join-records for each key and values computed
by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for
each non-matching record of
      * both {@code KTable}s
-     * @see #join(KTable, ValueJoiner)
-     * @see #leftJoin(KTable, ValueJoiner)
+     * @see #join(KTable, ValueJoiner, Materialized)
+     * @see #leftJoin(KTable, ValueJoiner, Materialized)
+     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)}
      */
+    @Deprecated
     <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                      final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
                                      final Serde<VR> joinSerde,
@@ -1941,7 +2195,9 @@ public interface KTable<K, V> {
      * both {@code KTable}s
      * @see #join(KTable, ValueJoiner)
      * @see #leftJoin(KTable, ValueJoiner)
+     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)}
      */
+    @Deprecated
     <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                      final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
                                      final StateStoreSupplier<KeyValueStore> storeSupplier);

http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ed7abdc..067bcfc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -534,7 +534,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     @Override
     public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends
R> joiner) {
-        return doJoin(other, joiner, false, false, null, null);
+        return doJoin(other, joiner, null, false, false);
+    }
+
+    @Override
+    public <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
+                                       final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
+                                       final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return doJoin(other, joiner, new MaterializedInternal<>(materialized), false,
false);
     }
 
     @Override
@@ -556,7 +566,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     @Override
     public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends
R> joiner) {
-        return doJoin(other, joiner, true, true, null, null);
+        return doJoin(other, joiner, null, true, true);
+    }
+
+    @Override
+    public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+                                            final ValueJoiner<? super V, ? super VO, ?
extends VR> joiner,
+                                            final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
+        return doJoin(other, joiner, new MaterializedInternal<>(materialized), true,
true);
     }
 
     @Override
@@ -578,7 +595,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     @Override
     public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
                                          final ValueJoiner<? super V, ? super V1, ? extends
R> joiner) {
-        return doJoin(other, joiner, true, false, null, null);
+        return doJoin(other, joiner, null, true, false);
+    }
+
+    @Override
+    public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+                                           final ValueJoiner<? super V, ? super VO, ?
extends VR> joiner,
+                                           final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
+        return doJoin(other,
+                      joiner,
+                      new MaterializedInternal<>(materialized),
+                      true,
+                      false);
     }
 
     @Override
@@ -619,8 +647,53 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
                                         final StateStoreSupplier<KeyValueStore> storeSupplier)
{
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
+        final String joinMergeName = builder.newName(MERGE_NAME);
         final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name();
-        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>)
other);
+        final KTable<K, R> result = buildJoin((AbstractStream<K>) other,
+                                              joiner,
+                                              leftOuter,
+                                              rightOuter,
+                                              joinMergeName,
+                                              internalQueryableName);
+
+        if (internalQueryableName != null) {
+            builder.internalTopologyBuilder.addStateStore(storeSupplier, joinMergeName);
+        }
+
+        return result;
+    }
+
+    private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other,
+                                          final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
+                                          final MaterializedInternal<K, VR, KeyValueStore<Bytes,
byte[]>> materialized,
+                                          final boolean leftOuter,
+                                          final boolean rightOuter) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+        final String internalQueryableName = materialized == null ? null : materialized.storeName();
+        final String joinMergeName = builder.newName(MERGE_NAME);
+        final KTable<K, VR> result = buildJoin((AbstractStream<K>) other,
+                                               joiner,
+                                               leftOuter,
+                                               rightOuter,
+                                               joinMergeName,
+                                               internalQueryableName);
+
+        if (materialized != null) {
+            final StoreBuilder<KeyValueStore<K, VR>> storeBuilder
+                    = new KeyValueStoreMaterializer<>(materialized).materialize();
+            builder.internalTopologyBuilder.addStateStore(storeBuilder, joinMergeName);
+        }
+        return result;
+    }
+
+    private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other,
+                                           final ValueJoiner<? super V, ? super V1, ?
extends R> joiner,
+                                           final boolean leftOuter,
+                                           final boolean rightOuter,
+                                           final String joinMergeName,
+                                           final String internalQueryableName) {
+        final Set<String> allSourceNodes = ensureJoinableWith(other);
 
         if (leftOuter) {
             enableSendingOldValues();
@@ -631,7 +704,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
 
         final String joinThisName = builder.newName(JOINTHIS_NAME);
         final String joinOtherName = builder.newName(JOINOTHER_NAME);
-        final String joinMergeName = builder.newName(MERGE_NAME);
+
 
         final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
         final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
@@ -659,11 +732,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName,
joinOtherName);
         builder.internalTopologyBuilder.connectProcessorAndStateStores(joinThisName, ((KTableImpl)
other).valueGetterSupplier().storeNames());
         builder.internalTopologyBuilder.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
-
-        if (internalQueryableName != null) {
-            builder.internalTopologyBuilder.addStateStore(storeSupplier, joinMergeName);
-        }
-
         return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes,
internalQueryableName, internalQueryableName != null);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 949f8be..1b45711 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -29,8 +30,10 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
@@ -344,8 +347,15 @@ public class KTableKTableJoinIntegrationTest {
         final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
         final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
 
+        Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized
= null;
+        if (queryableName != null) {
+            materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.String())
+                    .withCachingDisabled();
+        }
         join(join(table1, table2, joinType1, null /* no need to query intermediate result
*/), table3,
-            joinType2, queryableName).to(OUTPUT);
+            joinType2, materialized).to(OUTPUT);
 
         return new KafkaStreams(builder.build(), new StreamsConfig(streamsConfig));
     }
@@ -353,7 +363,7 @@ public class KTableKTableJoinIntegrationTest {
     private KTable<String, String> join(final KTable<String, String> first,
                                         final KTable<String, String> second,
                                         final JoinType joinType,
-                                        final String queryableName) {
+                                        final Materialized<String, String, KeyValueStore<Bytes,
byte[]>> materialized) {
         final ValueJoiner<String, String, String> joiner = new ValueJoiner<String,
String, String>() {
             @Override
             public String apply(final String value1, final String value2) {
@@ -361,13 +371,26 @@ public class KTableKTableJoinIntegrationTest {
             }
         };
 
+
         switch (joinType) {
             case INNER:
-                return first.join(second, joiner, Serdes.String(), queryableName);
+                if (materialized != null) {
+                    return first.join(second, joiner, materialized);
+                } else {
+                    return first.join(second, joiner);
+                }
             case LEFT:
-                return first.leftJoin(second, joiner, Serdes.String(), queryableName);
+                if (materialized != null) {
+                    return first.leftJoin(second, joiner, materialized);
+                } else {
+                    return first.leftJoin(second, joiner);
+                }
             case OUTER:
-                return first.outerJoin(second, joiner, Serdes.String(), queryableName);
+                if (materialized != null) {
+                    return first.outerJoin(second, joiner, materialized);
+                } else {
+                    return first.outerJoin(second, joiner);
+                }
         }
 
         throw new RuntimeException("Unknown join type.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 64ae6de..6ca38b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -437,19 +438,21 @@ public class KTableImplTest {
         table.join(table, MockValueJoiner.TOSTRING_JOINER, null, null);
     }
 
+    @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreSupplierInJoin() {
-        table.join(table, MockValueJoiner.TOSTRING_JOINER, null);
+        table.join(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null);
     }
 
+    @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreSupplierInLeftJoin() {
-        table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
+        table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreSupplierInOuterJoin() {
-        table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
+        table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -496,4 +499,19 @@ public class KTableImplTest {
             }
         }, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
     }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() {
+        table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() {
+        table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
+        table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null);
+    }
 }


Mime
View raw message