Repository: kafka
Updated Branches:
refs/heads/trunk ffc0965d3 -> 1a36af80b
MINOR: add KStream merge operator
guozhangwang
Added KStreamBuilder.merge(KStream...).
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes #536 from ymatsuda/kstream_merge_operator
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a36af80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a36af80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a36af80
Branch: refs/heads/trunk
Commit: 1a36af80b79a634a652e594d8240924ac32376ae
Parents: ffc0965
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Tue Nov 17 17:34:54 2015 -0800
Committer: Confluent <confluent@Confluents-MacBook-Pro.local>
Committed: Tue Nov 17 17:34:54 2015 -0800
----------------------------------------------------------------------
.../kafka/streams/kstream/KStreamBuilder.java | 10 +++++++
.../streams/kstream/internals/KStreamImpl.java | 29 ++++++++++++++++++--
.../kstream/internals/KStreamWindowedImpl.java | 2 +-
.../streams/kstream/KStreamBuilderTest.java | 29 ++++++++++++++++++++
4 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a36af80/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index c8a8bd3..a95d08c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -68,6 +68,16 @@ public class KStreamBuilder extends TopologyBuilder {
return new KStreamImpl<>(this, name, Collections.singleton(name));
}
+ /**
+ * Creates a new stream by merging the given streams
+ *
+ * @param streams the streams to be merged
+ * @return KStream
+ */
+ public <K, V> KStream<K, V> merge(KStream<K, V>... streams) {
+ return KStreamImpl.merge(this, streams);
+ }
+
public String newName(String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a36af80/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0986405..1ac23b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.lang.reflect.Array;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Set;
public class KStreamImpl<K, V> implements KStream<K, V> {
@@ -65,12 +66,12 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-";
- public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-";
+ public static final String MERGE_NAME = "KAFKA-MERGE-";
public static final String SOURCE_NAME = "KAFKA-SOURCE-";
protected final KStreamBuilder topology;
- protected final String name;
+ public final String name;
protected final Set<String> sourceNodes;
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes)
{
@@ -161,6 +162,30 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
return branchChildren;
}
+ public static <K, V> KStream<K, V> merge(KStreamBuilder topology, KStream<K,
V>[] streams) {
+ String name = topology.newName(MERGE_NAME);
+ String[] parentNames = new String[streams.length];
+ Set<String> allSourceNodes = new HashSet<>();
+
+ for (int i = 0; i < streams.length; i++) {
+ KStreamImpl stream = (KStreamImpl) streams[i];
+
+ parentNames[i] = stream.name;
+
+ if (allSourceNodes != null) {
+ if (stream.sourceNodes != null)
+ allSourceNodes.addAll(stream.sourceNodes);
+ else
+ allSourceNodes = null;
+ }
+
+ }
+
+ topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
+
+ return new KStreamImpl<>(topology, name, allSourceNodes);
+ }
+
@Override
public <K1, V1> KStream<K1, V1> through(String topic,
Serializer<K> keySerializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a36af80/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
index cb49873..100fbee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
@@ -55,7 +55,7 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K,
V> implement
String joinThisName = topology.newName(JOINTHIS_NAME);
String joinOtherName = topology.newName(JOINOTHER_NAME);
- String joinMergeName = topology.newName(JOINMERGE_NAME);
+ String joinMergeName = topology.newName(MERGE_NAME);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a36af80/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index cf1cfaa..d6994a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -17,8 +17,11 @@
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.TopologyException;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -48,4 +51,30 @@ public class KStreamBuilderTest {
assertEquals("Y-0000000001", builder.newName("Y-"));
assertEquals("Z-0000000002", builder.newName("Z-"));
}
+
+ @Test
+ public void testMerge() {
+ String topic1 = "topic-1";
+ String topic2 = "topic-2";
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream<String, String> source1 = builder.from(topic1);
+ KStream<String, String> source2 = builder.from(topic2);
+ KStream<String, String> merged = builder.merge(source1, source2);
+
+ MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ merged.process(processorSupplier);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver.setTime(0L);
+
+ driver.process(topic1, "A", "aa");
+ driver.process(topic2, "B", "bb");
+ driver.process(topic2, "C", "cc");
+ driver.process(topic1, "D", "dd");
+
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+ }
+
}
|