Repository: kafka
Updated Branches:
refs/heads/trunk 5e8958a85 -> d07bb1814
MINOR: comments on KStream methods, and fix generics
guozhangwang
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes #591 from ymatsuda/comments
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d07bb181
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d07bb181
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d07bb181
Branch: refs/heads/trunk
Commit: d07bb1814010ca4d822e44330d1e8ea4b2839c80
Parents: 5e8958a
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Wed Nov 25 16:44:43 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 25 16:44:43 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 33 +++++++++-----------
.../kafka/streams/kstream/KStreamBuilder.java | 8 ++---
.../streams/kstream/internals/KStreamImpl.java | 21 +++++--------
.../kstream/internals/KStreamImplTest.java | 2 +-
4 files changed, 25 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8f0794c..992bd75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
public interface KStream<K, V> {
/**
- * Creates a new stream consists of all elements of this stream which satisfy a predicate
+ * Creates a new instance of KStream consists of all elements of this stream which satisfy
a predicate
*
* @param predicate the instance of Predicate
* @return the stream with only those elements that satisfy the predicate
@@ -38,7 +38,7 @@ public interface KStream<K, V> {
KStream<K, V> filter(Predicate<K, V> predicate);
/**
- * Creates a new stream consists all elements of this stream which do not satisfy a predicate
+ * Creates a new instance of KStream consists all elements of this stream which do not
satisfy a predicate
*
* @param predicate the instance of Predicate
* @return the stream with only those elements that do not satisfy the predicate
@@ -56,30 +56,30 @@ public interface KStream<K, V> {
<K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>>
mapper);
/**
- * Creates a new stream by applying transforming each value in this stream into a different
value in the new stream.
+ * Creates a new instance of KStream by applying transforming each value in this stream
into a different value in the new stream.
*
* @param mapper the instance of ValueMapper
* @param <V1> the value type of the new stream
- * @return the mapped stream
+ * @return the instance of KStream
*/
<V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
/**
- * Creates a new stream by applying transforming each element in this stream into zero
or more elements in the new stream.
+ * Creates a new instance of KStream by applying transforming each element in this stream
into zero or more elements in the new stream.
*
* @param mapper the instance of KeyValueMapper
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
- * @return the mapped stream
+ * @return the instance of KStream
*/
<K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1,
V1>>> mapper);
/**
- * Creates a new stream by applying transforming each value in this stream into zero
or more values in the new stream.
+ * Creates a new instance of KStream by applying transforming each value in this stream
into zero or more values in the new stream.
*
* @param processor the instance of Processor
* @param <V1> the value type of the new stream
- * @return the mapped stream
+ * @return the instance of KStream
*/
<V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>>
processor);
@@ -98,7 +98,7 @@ public interface KStream<K, V> {
* An element will be dropped if none of the predicates evaluate true.
*
* @param predicates the ordered list of Predicate instances
- * @return the new streams that each contain those elements for which their Predicate
evaluated to true.
+ * @return the instances of KStream that each contain those elements for which their
Predicate evaluated to true.
*/
KStream<K, V>[] branch(Predicate<K, V>... predicates);
@@ -107,14 +107,12 @@ public interface KStream<K, V> {
* This is equivalent to calling to(topic) and from(topic).
*
* @param topic the topic name
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
* @return the new stream that consumes the given topic
*/
- <K1, V1> KStream<K1, V1> through(String topic);
+ KStream<K, V> through(String topic);
/**
- * Sends key-value to a topic, also creates a new stream from the topic.
+ * Sends key-value to a topic, also creates a new instance of KStream from the topic.
* This is equivalent to calling to(topic) and from(topic).
*
* @param topic the topic name
@@ -126,11 +124,9 @@ public interface KStream<K, V> {
* if not specified the default key deserializer defined in the
configuration will be used
* @param valDeserializer value deserializer used to create the new KStream,
* if not specified the default value deserializer defined in
the configuration will be used
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
* @return the new stream that consumes the given topic
*/
- <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer,
Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1>
valDeserializer);
+ KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V>
valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
/**
* Sends key-value to a topic using default serializers specified in the config.
@@ -155,7 +151,7 @@ public interface KStream<K, V> {
*
* @param transformerSupplier the class of TransformerDef
* @param stateStoreNames the names of the state store used by the processor
- * @return KStream
+ * @return the instance of KStream that contains transformed keys and values
*/
<K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1,
V1>> transformerSupplier, String... stateStoreNames);
@@ -164,7 +160,7 @@ public interface KStream<K, V> {
*
* @param valueTransformerSupplier the class of TransformerDef
* @param stateStoreNames the names of the state store used by the processor
- * @return KStream
+ * @return the instance of KStream that contains transformed keys and values
*/
<R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier,
String... stateStoreNames);
@@ -173,7 +169,6 @@ public interface KStream<K, V> {
*
* @param processorSupplier the supplier of the Processor to use
* @param stateStoreNames the names of the state store used by the processor
- * @return the new stream containing the processed output
*/
void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index a95d08c..ae8f694 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -43,11 +43,7 @@ public class KStreamBuilder extends TopologyBuilder {
* @return KStream
*/
public <K, V> KStream<K, V> from(String... topics) {
- String name = newName(KStreamImpl.SOURCE_NAME);
-
- addSource(name, topics);
-
- return new KStreamImpl<>(this, name, Collections.singleton(name));
+ return from(null, null, topics);
}
/**
@@ -60,7 +56,7 @@ public class KStreamBuilder extends TopologyBuilder {
* @param topics the topic names, if empty default to all the topics in the
config
* @return KStream
*/
- public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer,
Deserializer<? extends V> valDeserializer, String... topics) {
+ public <K, V> KStream<K, V> from(Deserializer<K> keyDeserializer, Deserializer<V>
valDeserializer, String... topics) {
String name = newName(KStreamImpl.SOURCE_NAME);
addSource(name, keyDeserializer, valDeserializer, topics);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a408458..04aa8e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.WindowSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.lang.reflect.Array;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -187,25 +186,21 @@ public class KStreamImpl<K, V> implements KStream<K, V>
{
}
@Override
- public <K1, V1> KStream<K1, V1> through(String topic,
- Serializer<K> keySerializer,
- Serializer<V> valSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V1> valDeserializer) {
+ public KStream<K, V> through(String topic,
+ Serializer<K> keySerializer,
+ Serializer<V> valSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valDeserializer) {
String sendName = topology.newName(SINK_NAME);
topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
- String sourceName = topology.newName(SOURCE_NAME);
-
- topology.addSource(sourceName, keyDeserializer, valDeserializer, topic);
-
- return new KStreamImpl<>(topology, sourceName, Collections.singleton(sourceName));
+ return topology.from(keyDeserializer, valDeserializer, topic);
}
@Override
- public <K1, V1> KStream<K1, V1> through(String topic) {
- return through(topic, (Serializer<K>) null, (Serializer<V>) null, (Deserializer<K1>)
null, (Deserializer<V1>) null);
+ public KStream<K, V> through(String topic) {
+ return through(topic, null, null, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index d924a34..1e775b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -119,7 +119,7 @@ public class KStreamImplTest {
stream4.to("topic-5");
- stream5.through("topic-6").process(new MockProcessorSupplier<>());
+ stream5.through("topic-6").process(new MockProcessorSupplier<String, Integer>());
assertEquals(2 + // sources
2 + // stream1
|