kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Small refactorings on KTable joins (#5540)
Date Tue, 21 Aug 2018 21:42:05 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a289865  MINOR: Small refactorings on KTable joins (#5540)
a289865 is described below

commit a289865266618d9736fe49d11edfbc2b146f5148
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Tue Aug 21 22:41:59 2018 +0100

    MINOR: Small refactorings on KTable joins (#5540)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../java/org/apache/kafka/streams/kstream/internals/KTableImpl.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index bbec96c..352e42d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -359,8 +359,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     public <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                                        final ValueJoiner<? super V, ? super VO, ? extends
VR> joiner,
                                        final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
-        Objects.requireNonNull(other, "other can't be null");
-        Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
         materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
@@ -378,6 +376,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                             final ValueJoiner<? super V, ? super VO, ?
extends VR> joiner,
                                             final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
         materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
 
@@ -394,8 +393,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                            final ValueJoiner<? super V, ? super VO, ?
extends VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
         materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+
         return doJoin(other, joiner, materializedInternal, true, false);
     }
 
@@ -410,7 +411,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         final String internalQueryableName = materializedInternal == null ? null : materializedInternal.storeName();
         final String joinMergeName = builder.newProcessorName(MERGE_NAME);
 
-
         return buildJoin(
             (AbstractStream<K>) other,
             joiner,


Mime
View raw message