kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: comments on KStream methods, and fix generics
Date Thu, 26 Nov 2015 00:44:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5e8958a85 -> d07bb1814


MINOR: comments on KStream methods, and fix generics

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #591 from ymatsuda/comments


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d07bb181
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d07bb181
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d07bb181

Branch: refs/heads/trunk
Commit: d07bb1814010ca4d822e44330d1e8ea4b2839c80
Parents: 5e8958a
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Wed Nov 25 16:44:43 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 25 16:44:43 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 33 +++++++++-----------
 .../kafka/streams/kstream/KStreamBuilder.java   |  8 ++---
 .../streams/kstream/internals/KStreamImpl.java  | 21 +++++--------
 .../kstream/internals/KStreamImplTest.java      |  2 +-
 4 files changed, 25 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8f0794c..992bd75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 public interface KStream<K, V> {
 
     /**
-     * Creates a new stream consists of all elements of this stream which satisfy a predicate
+     * Creates a new instance of KStream consists of all elements of this stream which satisfy
a predicate
      *
      * @param predicate the instance of Predicate
      * @return the stream with only those elements that satisfy the predicate
@@ -38,7 +38,7 @@ public interface KStream<K, V> {
     KStream<K, V> filter(Predicate<K, V> predicate);
 
     /**
-     * Creates a new stream consists all elements of this stream which do not satisfy a predicate
+     * Creates a new instance of KStream consists all elements of this stream which do not
satisfy a predicate
      *
      * @param predicate the instance of Predicate
      * @return the stream with only those elements that do not satisfy the predicate
@@ -56,30 +56,30 @@ public interface KStream<K, V> {
     <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>>
mapper);
 
     /**
-     * Creates a new stream by applying transforming each value in this stream into a different
value in the new stream.
+     * Creates a new instance of KStream by applying transforming each value in this stream
into a different value in the new stream.
      *
      * @param mapper the instance of ValueMapper
      * @param <V1>   the value type of the new stream
-     * @return the mapped stream
+     * @return the instance of KStream
      */
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
-     * Creates a new stream by applying transforming each element in this stream into zero
or more elements in the new stream.
+     * Creates a new instance of KStream by applying transforming each element in this stream
into zero or more elements in the new stream.
      *
      * @param mapper the instance of KeyValueMapper
      * @param <K1>   the key type of the new stream
      * @param <V1>   the value type of the new stream
-     * @return the mapped stream
+     * @return the instance of KStream
      */
     <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1,
V1>>> mapper);
 
     /**
-     * Creates a new stream by applying transforming each value in this stream into zero
or more values in the new stream.
+     * Creates a new instance of KStream by applying transforming each value in this stream
into zero or more values in the new stream.
      *
      * @param processor the instance of Processor
      * @param <V1>      the value type of the new stream
-     * @return the mapped stream
+     * @return the instance of KStream
      */
     <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>>
processor);
 
@@ -98,7 +98,7 @@ public interface KStream<K, V> {
      * An element will be dropped if none of the predicates evaluate true.
      *
      * @param predicates the ordered list of Predicate instances
-     * @return the new streams that each contain those elements for which their Predicate
evaluated to true.
+     * @return the instances of KStream that each contain those elements for which their
Predicate evaluated to true.
      */
     KStream<K, V>[] branch(Predicate<K, V>... predicates);
 
@@ -107,14 +107,12 @@ public interface KStream<K, V> {
      * This is equivalent to calling to(topic) and from(topic).
      *
      * @param topic           the topic name
-     * @param <K1>            the key type of the new stream
-     * @param <V1>            the value type of the new stream
      * @return the new stream that consumes the given topic
      */
-    <K1, V1> KStream<K1, V1> through(String topic);
+    KStream<K, V> through(String topic);
 
     /**
-     * Sends key-value to a topic, also creates a new stream from the topic.
+     * Sends key-value to a topic, also creates a new instance of KStream from the topic.
      * This is equivalent to calling to(topic) and from(topic).
      *
      * @param topic           the topic name
@@ -126,11 +124,9 @@ public interface KStream<K, V> {
      *                        if not specified the default key deserializer defined in the
configuration will be used
      * @param valDeserializer value deserializer used to create the new KStream,
      *                        if not specified the default value deserializer defined in
the configuration will be used
-     * @param <K1>            the key type of the new stream
-     * @param <V1>            the value type of the new stream
      * @return the new stream that consumes the given topic
      */
-    <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer,
Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1>
valDeserializer);
+    KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V>
valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
 
     /**
      * Sends key-value to a topic using default serializers specified in the config.
@@ -155,7 +151,7 @@ public interface KStream<K, V> {
      *
      * @param transformerSupplier the class of TransformerDef
      * @param stateStoreNames the names of the state store used by the processor
-     * @return KStream
+     * @return the instance of KStream that contains transformed keys and values
      */
     <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1,
V1>> transformerSupplier, String... stateStoreNames);
 
@@ -164,7 +160,7 @@ public interface KStream<K, V> {
      *
      * @param valueTransformerSupplier the class of TransformerDef
      * @param stateStoreNames the names of the state store used by the processor
-     * @return KStream
+     * @return the instance of KStream that contains transformed keys and values
      */
     <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier,
String... stateStoreNames);
 
@@ -173,7 +169,6 @@ public interface KStream<K, V> {
      *
      * @param processorSupplier the supplier of the Processor to use
      * @param stateStoreNames the names of the state store used by the processor
-     * @return the new stream containing the processed output
      */
     void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index a95d08c..ae8f694 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -43,11 +43,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return KStream
      */
     public <K, V> KStream<K, V> from(String... topics) {
-        String name = newName(KStreamImpl.SOURCE_NAME);
-
-        addSource(name, topics);
-
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
+        return from(null, null, topics);
     }
 
     /**
@@ -60,7 +56,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param topics          the topic names, if empty default to all the topics in the
config
      * @return KStream
      */
-    public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer,
Deserializer<? extends V> valDeserializer, String... topics) {
+    public <K, V> KStream<K, V> from(Deserializer<K> keyDeserializer, Deserializer<V>
valDeserializer, String... topics) {
         String name = newName(KStreamImpl.SOURCE_NAME);
 
         addSource(name, keyDeserializer, valDeserializer, topics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a408458..04aa8e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.WindowSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 import java.lang.reflect.Array;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -187,25 +186,21 @@ public class KStreamImpl<K, V> implements KStream<K, V>
{
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> through(String topic,
-                                            Serializer<K> keySerializer,
-                                            Serializer<V> valSerializer,
-                                            Deserializer<K1> keyDeserializer,
-                                            Deserializer<V1> valDeserializer) {
+    public KStream<K, V> through(String topic,
+                                 Serializer<K> keySerializer,
+                                 Serializer<V> valSerializer,
+                                 Deserializer<K> keyDeserializer,
+                                 Deserializer<V> valDeserializer) {
         String sendName = topology.newName(SINK_NAME);
 
         topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
 
-        String sourceName = topology.newName(SOURCE_NAME);
-
-        topology.addSource(sourceName, keyDeserializer, valDeserializer, topic);
-
-        return new KStreamImpl<>(topology, sourceName, Collections.singleton(sourceName));
+        return topology.from(keyDeserializer, valDeserializer, topic);
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> through(String topic) {
-        return through(topic, (Serializer<K>) null, (Serializer<V>) null, (Deserializer<K1>)
null, (Deserializer<V1>) null);
+    public KStream<K, V> through(String topic) {
+        return through(topic, null, null, null, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index d924a34..1e775b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -119,7 +119,7 @@ public class KStreamImplTest {
 
         stream4.to("topic-5");
 
-        stream5.through("topic-6").process(new MockProcessorSupplier<>());
+        stream5.through("topic-6").process(new MockProcessorSupplier<String, Integer>());
 
         assertEquals(2 + // sources
             2 + // stream1


Mime
View raw message