kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: add KStream merge operator
Date Wed, 18 Nov 2015 01:35:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ffc0965d3 -> 1a36af80b


MINOR: add KStream merge operator

guozhangwang

Added KStreamBuilder.merge(KStream...).

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #536 from ymatsuda/kstream_merge_operator


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a36af80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a36af80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a36af80

Branch: refs/heads/trunk
Commit: 1a36af80b79a634a652e594d8240924ac32376ae
Parents: ffc0965
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Tue Nov 17 17:34:54 2015 -0800
Committer: Confluent <confluent@Confluents-MacBook-Pro.local>
Committed: Tue Nov 17 17:34:54 2015 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   | 10 +++++++
 .../streams/kstream/internals/KStreamImpl.java  | 29 ++++++++++++++++++--
 .../kstream/internals/KStreamWindowedImpl.java  |  2 +-
 .../streams/kstream/KStreamBuilderTest.java     | 29 ++++++++++++++++++++
 4 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a36af80/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index c8a8bd3..a95d08c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -68,6 +68,16 @@ public class KStreamBuilder extends TopologyBuilder {
         return new KStreamImpl<>(this, name, Collections.singleton(name));
     }
 
+    /**
+     * Creates a new stream by merging the given streams
+     *
+     * @param streams the streams to be merged
+     * @return KStream
+     */
+    public <K, V> KStream<K, V> merge(KStream<K, V>... streams) {
+        return KStreamImpl.merge(this, streams);
+    }
+
     public String newName(String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a36af80/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0986405..1ac23b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 import java.lang.reflect.Array;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 
 public class KStreamImpl<K, V> implements KStream<K, V> {
@@ -65,12 +66,12 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-";
 
-    public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-";
+    public static final String MERGE_NAME = "KAFKA-MERGE-";
 
     public static final String SOURCE_NAME = "KAFKA-SOURCE-";
 
     protected final KStreamBuilder topology;
-    protected final String name;
+    public final String name;
     protected final Set<String> sourceNodes;
 
     public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes)
{
@@ -161,6 +162,30 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
         return branchChildren;
     }
 
+    public static <K, V> KStream<K, V> merge(KStreamBuilder topology, KStream<K,
V>[] streams) {
+        String name = topology.newName(MERGE_NAME);
+        String[] parentNames = new String[streams.length];
+        Set<String> allSourceNodes = new HashSet<>();
+
+        for (int i = 0; i < streams.length; i++) {
+            KStreamImpl stream = (KStreamImpl) streams[i];
+
+            parentNames[i] = stream.name;
+
+            if (allSourceNodes != null) {
+                if (stream.sourceNodes != null)
+                    allSourceNodes.addAll(stream.sourceNodes);
+                else
+                    allSourceNodes = null;
+            }
+
+        }
+
+        topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
+
+        return new KStreamImpl<>(topology, name, allSourceNodes);
+    }
+
     @Override
     public <K1, V1> KStream<K1, V1> through(String topic,
                                             Serializer<K> keySerializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a36af80/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
index cb49873..100fbee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
@@ -55,7 +55,7 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K,
V> implement
 
         String joinThisName = topology.newName(JOINTHIS_NAME);
         String joinOtherName = topology.newName(JOINOTHER_NAME);
-        String joinMergeName = topology.newName(JOINMERGE_NAME);
+        String joinMergeName = topology.newName(MERGE_NAME);
 
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a36af80/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index cf1cfaa..d6994a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.TopologyException;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -48,4 +51,30 @@ public class KStreamBuilderTest {
         assertEquals("Y-0000000001", builder.newName("Y-"));
         assertEquals("Z-0000000002", builder.newName("Z-"));
     }
+
+    @Test
+    public void testMerge() {
+        String topic1 = "topic-1";
+        String topic2 = "topic-2";
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> source1 = builder.from(topic1);
+        KStream<String, String> source2 = builder.from(topic2);
+        KStream<String, String> merged = builder.merge(source1, source2);
+
+        MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        merged.process(processorSupplier);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver.setTime(0L);
+
+        driver.process(topic1, "A", "aa");
+        driver.process(topic2, "B", "bb");
+        driver.process(topic2, "C", "cc");
+        driver.process(topic1, "D", "dd");
+
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+    }
+
 }


Mime
View raw message