kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2653: Add KStream/KTable Aggregation and KTable Join APIs
Date Fri, 08 Jan 2016 01:18:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4836e525c -> 40d731b87


http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
new file mode 100644
index 0000000..00f4b55
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.Aggregator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * NOTE: This is just a demo aggregate supplier that can be implemented by users to add their
own built-in aggregates.
+ * It is highly in-efficient and is not supposed to be merged in.
+ */
+public class TopKSupplier<K, V extends Comparable<V>> implements AggregatorSupplier<K,
V, Collection<V>> {
+
+    private final int k;
+
+    public TopKSupplier(int k) {
+        this.k = k;
+    }
+
+    private class TopK implements Aggregator<K, V, Collection<V>> {
+
+        private final Map<K, PriorityQueue<V>> sorted = new HashMap<>();
+
+        @Override
+        public Collection<V> initialValue() {
+            return Collections.<V>emptySet();
+        }
+
+        @Override
+        public Collection<V> add(K aggKey, V value, Collection<V> aggregate)
{
+            PriorityQueue<V> queue = sorted.get(aggKey);
+            if (queue == null) {
+                queue = new PriorityQueue<>();
+                sorted.put(aggKey, queue);
+            }
+
+            queue.add(value);
+
+            PriorityQueue<V> copy = new PriorityQueue<>(queue);
+
+            Set<V> ret = new HashSet<>();
+            for (int i = 1; i <= k; i++)
+                ret.add(copy.poll());
+
+            return ret;
+        }
+
+        @Override
+        public Collection<V> remove(K aggKey, V value, Collection<V> aggregate)
{
+            PriorityQueue<V> queue = sorted.get(aggKey);
+
+            if (queue == null)
+                throw new IllegalStateException("This should not happen.");
+
+            queue.remove(value);
+
+            PriorityQueue<V> copy = new PriorityQueue<>(queue);
+
+            Set<V> ret = new HashSet<>();
+            for (int i = 1; i <= k; i++)
+                ret.add(copy.poll());
+
+            return ret;
+        }
+
+        @Override
+        public Collection<V> merge(Collection<V> aggr1, Collection<V> aggr2)
{
+            PriorityQueue<V> copy = new PriorityQueue<>(aggr1);
+            copy.addAll(aggr2);
+
+            Set<V> ret = new HashSet<>();
+            for (int i = 1; i <= k; i++)
+                ret.add(copy.poll());
+
+            return ret;
+        }
+    }
+
+    @Override
+    public Aggregator<K, V, Collection<V>> get() {
+        return new TopK();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
new file mode 100644
index 0000000..8ac8f70
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class UnlimitedWindow extends Window {
+
+    public UnlimitedWindow(long start) {
+        super(start, Long.MAX_VALUE);
+    }
+
+    @Override
+    public boolean overlap(Window other) {
+        return super.overlap(other) && other.getClass().equals(UnlimitedWindow.class);
+    }
+
+    @Override
+    public boolean equalsTo(Window other) {
+        return super.equalsTo(other) && other.getClass().equals(UnlimitedWindow.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index d6994a9..f79063f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -32,7 +32,7 @@ public class KStreamBuilderTest {
     public void testFrom() {
         final KStreamBuilder builder = new KStreamBuilder();
 
-        builder.from("topic-1", "topic-2");
+        builder.stream("topic-1", "topic-2");
 
         builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
     }
@@ -59,8 +59,8 @@ public class KStreamBuilderTest {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> source1 = builder.from(topic1);
-        KStream<String, String> source2 = builder.from(topic2);
+        KStream<String, String> source1 = builder.stream(topic1);
+        KStream<String, String> source2 = builder.stream(topic2);
         KStream<String, String> merged = builder.merge(source1, source2);
 
         MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 40eba2f..88366fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -67,7 +67,7 @@ public class KStreamBranchTest {
         KStream<Integer, String>[] branches;
         MockProcessorSupplier<Integer, String>[] processors;
 
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         branches = stream.branch(isEven, isMultipleOfThree, isOdd);
 
         assertEquals(3, branches.length);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index d1e5d38..3bad041 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -52,7 +52,7 @@ public class KStreamFilterTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.filter(isMultipleOfThree).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);
@@ -72,7 +72,7 @@ public class KStreamFilterTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.filterOut(isMultipleOfThree).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 61b5ccd..a55fd30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -60,7 +60,7 @@ public class KStreamFlatMapTest {
         MockProcessorSupplier<String, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.flatMap(mapper).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 66faf07..eef7933 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -58,7 +58,7 @@ public class KStreamFlatMapValuesTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.flatMapValues(mapper).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 108bf3c..1ce56ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -18,10 +18,16 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
@@ -35,12 +41,16 @@ public class KStreamImplTest {
 
     @Test
     public void testNumProcesses() {
-        final Deserializer<String> deserializer = new StringDeserializer();
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Integer> integerSerializer = new IntegerSerializer();
+        final Deserializer<Integer> integerDeserializer = new IntegerDeserializer();
+
         final KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> source1 = builder.from(deserializer, deserializer,
"topic-1", "topic-2");
+        KStream<String, String> source1 = builder.stream(stringDeserializer, stringDeserializer,
"topic-1", "topic-2");
 
-        KStream<String, String> source2 = builder.from(deserializer, deserializer,
"topic-3", "topic-4");
+        KStream<String, String> source2 = builder.stream(stringDeserializer, stringDeserializer,
"topic-3", "topic-4");
 
         KStream<String, String> stream1 =
             source1.filter(new Predicate<String, String>() {
@@ -99,7 +109,21 @@ public class KStreamImplTest {
                 }
         );
 
-        streams2[0].to("topic-5");
+        KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer,
Integer, Integer>() {
+            @Override
+            public Integer apply(Integer value1, Integer value2) {
+                return value1 + value2;
+            }
+        }, JoinWindows.of("join-0"), stringSerializer, integerSerializer, integerSerializer,
stringDeserializer, integerDeserializer, integerDeserializer);
+
+        KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer,
Integer, Integer>() {
+            @Override
+            public Integer apply(Integer value1, Integer value2) {
+                return value1 + value2;
+            }
+        }, JoinWindows.of("join-1"), stringSerializer, integerSerializer, integerSerializer,
stringDeserializer, integerDeserializer, integerDeserializer);
+
+        stream4.to("topic-5");
 
         streams2[1].through("topic-6").process(new MockProcessorSupplier<String, Integer>());
 
@@ -109,6 +133,7 @@ public class KStreamImplTest {
             1 + // stream3
             1 + 2 + // streams2
             1 + 2 + // streams3
+            5 * 2 + // stream2-stream3 joins
             1 + // to
             2 + // through
             1, // process

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 5a937af..90341a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.JoinWindowSpec;
+import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -71,9 +71,9 @@ public class KStreamKStreamJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100),
+            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100),
                     keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer,
valDeserializer);
             joined.process(processor);
 
@@ -177,9 +177,9 @@ public class KStreamKStreamJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.outerJoin(stream2, joiner, JoinWindowSpec.of("test").within(100),
+            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100),
                     keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer,
valDeserializer);
             joined.process(processor);
 
@@ -285,9 +285,9 @@ public class KStreamKStreamJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100),
+            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100),
                     keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer,
valDeserializer);
             joined.process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index dbb5515..8c6e43b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.JoinWindowSpec;
+import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -71,9 +71,9 @@ public class KStreamKStreamLeftJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.leftJoin(stream2, joiner, JoinWindowSpec.of("test").within(100),
+            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100),
                     keySerializer, valSerializer, keyDeserializer, valDeserializer);
             joined.process(processor);
 
@@ -157,9 +157,9 @@ public class KStreamKStreamLeftJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.leftJoin(stream2, joiner, JoinWindowSpec.of("test").within(100),
+            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100),
                     keySerializer, valSerializer, keyDeserializer, valDeserializer);
             joined.process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index adcf63a..880adce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -81,7 +81,7 @@ public class KStreamKTableLeftJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream = builder.from(keyDeserializer, valDeserializer, topic1);
+            stream = builder.stream(keyDeserializer, valDeserializer, topic1);
             table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer,
topic2);
             stream.leftJoin(table, joiner).process(processor);
 
@@ -162,7 +162,7 @@ public class KStreamKTableLeftJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
+        stream = builder.stream(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
         table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer,
topic2);
 
         stream.leftJoin(table, joiner).process(processor);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 2ae8a97..0f7cb6a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -54,7 +54,7 @@ public class KStreamMapTest {
         MockProcessorSupplier<String, Integer> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.map(mapper).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index f830c00..68fd285 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -51,7 +51,7 @@ public class KStreamMapValuesTest {
 
         KStream<Integer, String> stream;
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.mapValues(mapper).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index e397dd1..0b7b1e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -73,7 +73,7 @@ public class KStreamTransformTest {
 
         KStream<Integer, Integer> stream;
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.transform(transformerSupplier).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index c5c9b39..7def9db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -72,7 +72,7 @@ public class KStreamTransformValuesTest {
 
         KStream<Integer, Integer> stream;
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.transformValues(valueTransformerSupplier).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);


Mime
View raw message