kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: Auto-repartitioning for merge() and code simplifications
Date Tue, 05 Jul 2016 16:16:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 87b3ce16c -> 12fa188b9


HOTFIX: Auto-repartitioning for merge() and code simplifications

follow-up to auto-through feature:
 - add sourceNode to transform()
 - enable auto-repartitioning in merge()
 - null check not required anymore (always join-able due to auto-through)

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1580 from mjsax/hotfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/12fa188b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/12fa188b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/12fa188b

Branch: refs/heads/trunk
Commit: 12fa188b9aaeafa337c813f4a84a3117d6bf403f
Parents: 87b3ce1
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Jul 5 09:16:45 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 5 09:16:45 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/AbstractStream.java        | 14 ++++++--------
 .../streams/kstream/internals/KStreamImpl.java   | 19 +++++++++----------
 2 files changed, 15 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/12fa188b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index ebada92..b0c1111 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -31,6 +31,10 @@ public abstract class AbstractStream<K> {
     protected final Set<String> sourceNodes;
 
     public AbstractStream(KStreamBuilder topology, String name, Set<String> sourceNodes)
{
+        if (sourceNodes == null || sourceNodes.isEmpty()) {
+            throw new IllegalArgumentException("parameter <sourceNodes> must not be
null or empty");
+        }
+
         this.topology = topology;
         this.name = name;
         this.sourceNodes = sourceNodes;
@@ -40,15 +44,9 @@ public abstract class AbstractStream<K> {
      * @throws TopologyBuilderException if the streams are not joinable
      */
     protected Set<String> ensureJoinableWith(AbstractStream<K> other) {
-        Set<String> thisSourceNodes = sourceNodes;
-        Set<String> otherSourceNodes = other.sourceNodes;
-
-        if (thisSourceNodes == null || otherSourceNodes == null)
-            throw new TopologyBuilderException(this.name + " and " + other.name + " are not
joinable");
-
         Set<String> allSourceNodes = new HashSet<>();
-        allSourceNodes.addAll(thisSourceNodes);
-        allSourceNodes.addAll(otherSourceNodes);
+        allSourceNodes.addAll(sourceNodes);
+        allSourceNodes.addAll(other.sourceNodes);
 
         topology.copartitionSources(allSourceNodes);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12fa188b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 79ff842..ca2e944 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -228,27 +228,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
     }
 
     public static <K, V> KStream<K, V> merge(KStreamBuilder topology, KStream<K,
V>[] streams) {
+        if (streams == null || streams.length == 0) {
+            throw new IllegalArgumentException("Parameter <streams> must not be null
or has length zero");
+        }
+
         String name = topology.newName(MERGE_NAME);
         String[] parentNames = new String[streams.length];
         Set<String> allSourceNodes = new HashSet<>();
+        boolean requireRepartitioning = false;
 
         for (int i = 0; i < streams.length; i++) {
             KStreamImpl stream = (KStreamImpl) streams[i];
 
             parentNames[i] = stream.name;
-
-            if (allSourceNodes != null) {
-                if (stream.sourceNodes != null)
-                    allSourceNodes.addAll(stream.sourceNodes);
-                else
-                    allSourceNodes = null;
-            }
-
+            requireRepartitioning |= stream.repartitionRequired;
+            allSourceNodes.addAll(stream.sourceNodes);
         }
 
         topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes, false);
+        return new KStreamImpl<>(topology, name, allSourceNodes, requireRepartitioning);
     }
 
     @Override
@@ -318,7 +317,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
         topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
 
-        return new KStreamImpl<>(topology, name, null, true);
+        return new KStreamImpl<>(topology, name, sourceNodes, true);
     }
 
     @Override


Mime
View raw message