kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Add InterfaceStability.Unstable annotations to some Kafka Streams public APIs
Date Mon, 21 Mar 2016 19:06:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 1b5879653 -> 4f8b3aed8


MINOR: Add InterfaceStability.Unstable annotations to some Kafka Streams public APIs

Also improves Java docs for the Streams high-level DSL.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ismael Juma, Michael G. Noll

Closes #1097 from guozhangwang/KNewJavaDoc

(cherry picked from commit b6c29e3810bd59f39fa93c429817396cf8c324b7)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f8b3aed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f8b3aed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f8b3aed

Branch: refs/heads/0.10.0
Commit: 4f8b3aed8dc6a77293c526253123d999a23af149
Parents: 1b58796
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Mar 21 12:06:07 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Mar 21 12:06:17 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/Aggregator.java       |   6 +-
 .../kafka/streams/kstream/Initializer.java      |   4 +-
 .../kafka/streams/kstream/JoinWindows.java      |  15 +-
 .../apache/kafka/streams/kstream/KStream.java   | 270 ++++++++++---------
 .../kafka/streams/kstream/KStreamBuilder.java   |  44 ++-
 .../apache/kafka/streams/kstream/KTable.java    | 165 ++++++------
 .../kafka/streams/kstream/KeyValueMapper.java   |   6 +-
 .../apache/kafka/streams/kstream/Predicate.java |   4 +-
 .../apache/kafka/streams/kstream/Reducer.java   |   2 +-
 .../kafka/streams/kstream/Transformer.java      |   6 +-
 .../kafka/streams/kstream/ValueJoiner.java      |   6 +-
 .../kafka/streams/kstream/ValueMapper.java      |   4 +-
 .../kafka/streams/kstream/ValueTransformer.java |   4 +-
 .../apache/kafka/streams/kstream/Windowed.java  |   5 +-
 .../apache/kafka/streams/kstream/Windows.java   |  33 ++-
 .../processor/DefaultPartitionGrouper.java      |   8 +-
 .../streams/processor/ProcessorContext.java     |   2 +
 .../kafka/streams/state/KeyValueStore.java      |   2 +
 .../apache/kafka/streams/state/WindowStore.java |   2 +
 19 files changed, 303 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 0d29409..9ec9f96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -20,9 +20,9 @@ package org.apache.kafka.streams.kstream;
 /**
  * The Aggregator interface for aggregating values of the given key.
  *
- * @param <K> Key type.
- * @param <V> Receiving value type.
- * @param <T> Aggregate value type.
+ * @param <K>   key type
+ * @param <V>   original value type
+ * @param <T>   aggregate value type
  */
 public interface Aggregator<K, V, T> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index fdd5220..67c1c21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -18,9 +18,9 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * The Initializer interface for creating an initial value for aggregations.
+ * The Initializer interface for creating an initial value in aggregations.
  *
- * @param <T> Aggregate value type.
+ * @param <T>   aggregate value type
  */
 public interface Initializer<T> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 70294a8..24dbdd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -45,8 +45,7 @@ public class JoinWindows extends Windows<TumblingWindow> {
      * Specifies that records of the same key are joinable if their timestamp stamps are within
      * timeDifference.
      *
-     * @param timeDifference
-     * @return
+     * @param timeDifference    join window interval in milliseconds
      */
     public JoinWindows within(long timeDifference) {
         return new JoinWindows(this.name, timeDifference, timeDifference);
@@ -54,11 +53,10 @@ public class JoinWindows extends Windows<TumblingWindow> {
 
     /**
      * Specifies that records of the same key are joinable if their timestamp stamps are within
-     * timeDifference, and if the timestamp of a record from the secondary stream is
-     * is earlier than or equal to the timestamp of a record from the first stream.
+     * the join window interval, and if the timestamp of a record from the secondary stream is
+     * earlier than or equal to the timestamp of a record from the first stream.
      *
-     * @param timeDifference
-     * @return
+     * @param timeDifference    join window interval in milliseconds
      */
     public JoinWindows before(long timeDifference) {
         return new JoinWindows(this.name, timeDifference, this.after);
@@ -66,11 +64,10 @@ public class JoinWindows extends Windows<TumblingWindow> {
 
     /**
      * Specifies that records of the same key are joinable if their timestamp stamps are within
-     * timeDifference, and if the timestamp of a record from the secondary stream is
+     * the join window interval, and if the timestamp of a record from the secondary stream
      * is later than or equal to the timestamp of a record from the first stream.
      *
-     * @param timeDifference
-     * @return
+     * @param timeDifference    join window interval in milliseconds
      */
     public JoinWindows after(long timeDifference) {
         return new JoinWindows(this.name, this.before, timeDifference);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6f05c3b..c4188de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -17,116 +17,110 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 /**
- * KStream is an abstraction of an event stream in key-value pairs.
+ * KStream is an abstraction of a <i>record stream</i> of key-value pairs.
  *
  * @param <K> Type of keys
  * @param <V> Type of values
  */
+@InterfaceStability.Unstable
 public interface KStream<K, V> {
 
     /**
-     * Creates a new instance of KStream consists of all elements of this stream which satisfy a predicate
+     * Create a new instance of {@link KStream} that consists of all elements of this stream which satisfy a predicate.
      *
-     * @param predicate the instance of Predicate
-     * @return the instance of KStream with only those elements that satisfy the predicate
+     * @param predicate     the instance of {@link Predicate}
      */
     KStream<K, V> filter(Predicate<K, V> predicate);
 
     /**
-     * Creates a new instance of KStream consists all elements of this stream which do not satisfy a predicate
+     * Create a new instance of {@link KStream} that consists all elements of this stream which do not satisfy a predicate.
      *
-     * @param predicate the instance of Predicate
-     * @return the instance of KStream with only those elements that do not satisfy the predicate
+     * @param predicate     the instance of {@link Predicate}
      */
     KStream<K, V> filterOut(Predicate<K, V> predicate);
 
     /**
-     * Creates a new instance of KStream by applying transforming each element in this stream into a different element in the new stream.
+     * Create a new instance of {@link KStream} by transforming each element in this stream into a different element in the new stream.
      *
-     * @param mapper the instance of KeyValueMapper
-     * @param <K1>   the key type of the new stream
-     * @param <V1>   the value type of the new stream
-     * @return the instance of KStream
+     * @param mapper        the instance of {@link KeyValueMapper}
+     * @param <K1>          the key type of the new stream
+     * @param <V1>          the value type of the new stream
      */
     <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
 
     /**
-     * Creates a new instance of KStream by transforming each value in this stream into a different value in the new stream.
+     * Create a new instance of {@link KStream} by transforming the value of each element in this stream into a new value in the new stream.
      *
-     * @param mapper the instance of ValueMapper
-     * @param <V1>   the value type of the new stream
-     * @return the instance of KStream
+     * @param mapper        the instance of {@link ValueMapper}
+     * @param <V1>          the value type of the new stream
      */
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
-     * Creates a new instance of KStream by transforming each element in this stream into zero or more elements in the new stream.
+     * Create a new instance of {@link KStream} by transforming each element in this stream into zero or more elements in the new stream.
      *
-     * @param mapper the instance of KeyValueMapper
-     * @param <K1>   the key type of the new stream
-     * @param <V1>   the value type of the new stream
-     * @return the instance of KStream
+     * @param mapper        the instance of {@link KeyValueMapper}
+     * @param <K1>          the key type of the new stream
+     * @param <V1>          the value type of the new stream
      */
     <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
 
     /**
-     * Creates a new stream by transforming each value in this stream into zero or more values in the new stream.
+     * Create a new instance of {@link KStream} by transforming the value of each element in this stream into zero or more values with the same key in the new stream.
      *
-     * @param processor the instance of Processor
-     * @param <V1>      the value type of the new stream
-     * @return the instance of KStream
+     * @param processor     the instance of {@link ValueMapper}
+     * @param <V1>          the value type of the new stream
      */
     <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
 
     /**
-     * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in
-     * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
-     * a corresponding stream for the first predicate is evaluated true.
-     * An element will be dropped if none of the predicates evaluate true.
+     * Creates an array of {@link KStream} from this stream by branching the elements in the original stream based on the supplied predicates.
+     * Each element is evaluated against the supplied predicates, and predicates are evaluated in order. Each stream in the result array
+     * corresponds position-wise (index) to the predicate in the supplied predicates. The branching happens on first-match: An element
+     * in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and
+     * assigned to this stream only. An element will be dropped if none of the predicates evaluate to true.
      *
-     * @param predicates the ordered list of Predicate instances
-     * @return the instances of KStream that each contain those elements for which their Predicate evaluated to true.
+     * @param predicates    the ordered list of {@link Predicate} instances
      */
     KStream<K, V>[] branch(Predicate<K, V>... predicates);
 
     /**
-     * Sends key-value to a topic, also creates a new instance of KStream from the topic.
-     * This is equivalent to calling to(topic) and from(topic).
+     * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
+     * using default serializers and deserializers.
+     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
      *
      * @param topic     the topic name
-     *
-     * @return          the instance of {@link KStream} that consumes the given topic
      */
     KStream<K, V> through(String topic);
 
     /**
-     * Sends key-value to a topic, also creates a new instance of KStream from the topic.
-     * This is equivalent to calling to(topic) and from(topic).
+     * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic.
+     * This is equivalent to calling {@link #to(Serde, Serde, String)} and
+     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration will be used
      * @param valSerde  value serde used to send key-value pairs,
      *                  if not specified the default value serde defined in the configuration will be used
      * @param topic     the topic name
-     *
-     * @return          the instance of {@link KStream} that consumes the given topic
      */
     KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
-     * Sends key-value to a topic using default serializers specified in the config.
+     * Materialize this stream to a topic using default serializers specified in the config.
      *
      * @param topic     the topic name
      */
     void to(String topic);
 
     /**
-     * Sends key-value to a topic.
+     * Materialize this stream to a topic.
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default serde defined in the configs will be used
@@ -137,45 +131,43 @@ public interface KStream<K, V> {
     void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
-     * Applies a stateful transformation to all elements in this stream.
+     * Create a new {@link KStream} instance by applying a {@link org.apache.kafka.streams.kstream.Transformer} to all elements in this stream, one element at a time.
      *
-     * @param transformerSupplier the class of valueTransformerSupplier
-     * @param stateStoreNames the names of the state store used by the processor
-     * @return the instance of KStream that contains transformed keys and values
+     * @param transformerSupplier   the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer}
+     * @param stateStoreNames       the names of the state store used by the processor
      */
     <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
 
     /**
-     * Applies a stateful transformation to all values in this stream.
+     * Create a new {@link KStream} instance by applying a {@link org.apache.kafka.streams.kstream.ValueTransformer} to all values in this stream, one element at a time.
      *
-     * @param valueTransformerSupplier the class of valueTransformerSupplier
-     * @param stateStoreNames the names of the state store used by the processor
-     * @return the instance of KStream that contains the keys and transformed values
+     * @param valueTransformerSupplier  the instance of {@link ValueTransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.ValueTransformer}
+     * @param stateStoreNames           the names of the state store used by the processor
      */
     <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
 
     /**
-     * Processes all elements in this stream by applying a processor.
+     * Process all elements in this stream, one element at a time, by applying a {@link org.apache.kafka.streams.processor.Processor}.
      *
-     * @param processorSupplier the supplier of the Processor to use
-     * @param stateStoreNames the names of the state store used by the processor
+     * @param processorSupplier         the supplier of {@link ProcessorSupplier} that generates {@link org.apache.kafka.streams.processor.Processor}
+     * @param stateStoreNames           the names of the state store used by the processor
      */
     void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
 
     /**
-     * Combines values of this stream with another KStream using Windowed Inner Join.
+     * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join.
      *
-     * @param otherStream the instance of KStream joined with this stream
-     * @param joiner ValueJoiner
-     * @param windows the specification of the join window
-     * @param keySerde key serdes,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param thisValueSerde value serdes for this stream,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param otherValueSerde value serdes for other stream,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param <V1>   the value type of the other stream
-     * @param <R>   the value type of the new stream
+     * @param otherStream       the instance of {@link KStream} joined with this stream
+     * @param joiner            the instance of {@link ValueJoiner}
+     * @param windows           the specification of the {@link JoinWindows}
+     * @param keySerde          key serdes for materializing both streams,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param thisValueSerde    value serdes for materializing this stream,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param otherValueSerde   value serdes for materializing the other stream,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param <V1>              the value type of the other stream
+     * @param <R>               the value type of the new stream
      */
     <V1, R> KStream<K, R> join(
             KStream<K, V1> otherStream,
@@ -186,7 +178,8 @@ public interface KStream<K, V> {
             Serde<V1> otherValueSerde);
 
     /**
-     * Combines values of this stream with another KStream using Windowed Inner Join.
+     * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join
+     * with default serializers and deserializers.
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -200,19 +193,19 @@ public interface KStream<K, V> {
             JoinWindows windows);
 
     /**
-     * Combines values of this stream with another KStream using Windowed Outer Join.
+     * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join.
      *
-     * @param otherStream the instance of KStream joined with this stream
-     * @param joiner ValueJoiner
-     * @param windows the specification of the join window
-     * @param keySerde key serdes,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param thisValueSerde value serdes for this stream,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param otherValueSerde value serdes for other stream,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param <V1>   the value type of the other stream
-     * @param <R>   the value type of the new stream
+     * @param otherStream       the instance of {@link KStream} joined with this stream
+     * @param joiner            the instance of {@link ValueJoiner}
+     * @param windows           the specification of the {@link JoinWindows}
+     * @param keySerde          key serdes for materializing both streams,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param thisValueSerde    value serdes for materializing this stream,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param otherValueSerde   value serdes for materializing the other stream,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param <V1>              the value type of the other stream
+     * @param <R>               the value type of the new stream
      */
     <V1, R> KStream<K, R> outerJoin(
             KStream<K, V1> otherStream,
@@ -223,7 +216,8 @@ public interface KStream<K, V> {
             Serde<V1> otherValueSerde);
 
     /**
-     * Combines values of this stream with another KStream using Windowed Outer Join.
+     * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join
+     * with default serializers and deserializers.
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -237,17 +231,17 @@ public interface KStream<K, V> {
             JoinWindows windows);
 
     /**
-     * Combines values of this stream with another KStream using Windowed Left Join.
+     * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join.
      *
-     * @param otherStream the instance of KStream joined with this stream
-     * @param joiner ValueJoiner
-     * @param windows the specification of the join window
-     * @param keySerde key serdes,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param otherValueSerde value serdes for other stream,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param <V1>   the value type of the other stream
-     * @param <R>   the value type of the new stream
+     * @param otherStream       the instance of {@link KStream} joined with this stream
+     * @param joiner            the instance of {@link ValueJoiner}
+     * @param windows           the specification of the {@link JoinWindows}
+     * @param keySerde          key serdes for materializing the other stream,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param otherValueSerde   value serdes for materializing the other stream,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param <V1>              the value type of the other stream
+     * @param <R>               the value type of the new stream
      */
     <V1, R> KStream<K, R> leftJoin(
             KStream<K, V1> otherStream,
@@ -257,7 +251,8 @@ public interface KStream<K, V> {
             Serde<V1> otherValueSerde);
 
     /**
-     * Combines values of this stream with another KStream using Windowed Left Join.
+     * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join
+     * with default serializers and deserializers.
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -271,7 +266,7 @@ public interface KStream<K, V> {
             JoinWindows windows);
 
     /**
-     * Combines values of this stream with KTable using Left Join.
+     * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
      *
      * @param table     the instance of {@link KTable} joined with this stream
      * @param joiner    the instance of {@link ValueJoiner}
@@ -281,48 +276,63 @@ public interface KStream<K, V> {
     <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
 
     /**
-     * Aggregate values of this stream by key on a window basis.
+     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
      *
-     * @param reducer the class of {@link Reducer}
-     * @param windows the specification of the aggregation {@link Windows}
+     * @param reducer           the instance of {@link Reducer}
+     * @param windows           the specification of the aggregation {@link Windows}
+     * @param keySerde          key serdes for materializing the aggregated table,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param valueSerde        value serdes for materializing the aggregated table,
+     *                          if not specified the default serdes defined in the configs will be used
      */
     <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
                                                           Windows<W> windows,
                                                           Serde<K> keySerde,
-                                                          Serde<V> aggValueSerde);
+                                                          Serde<V> valueSerde);
 
     /**
-     * Aggregate values of this stream by key on a window basis.
+     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}
+     * with default serializers and deserializers.
      *
-     * @param reducer the class of {@link Reducer}
+     * @param reducer the instance of {@link Reducer}
      * @param windows the specification of the aggregation {@link Windows}
      */
     <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows);
 
     /**
-     * Aggregate values of this stream by key on a window basis.
+     * Combine values of this stream by key into a new instance of ever-updating {@link KTable}.
      *
-     * @param reducer the class of Reducer
+     * @param reducer           the instance of {@link Reducer}
+     * @param keySerde          key serdes for materializing the aggregated table,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param valueSerde        value serdes for materializing the aggregated table,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param name              the name of the resulted {@link KTable}
      */
     KTable<K, V> reduceByKey(Reducer<V> reducer,
                              Serde<K> keySerde,
-                             Serde<V> aggValueSerde,
+                             Serde<V> valueSerde,
                              String name);
 
     /**
-     * Aggregate values of this stream by key on a window basis.
+     * Combine values of this stream by key into a new instance of ever-updating {@link KTable} with default serializers and deserializers.
      *
-     * @param reducer the class of {@link Reducer}
+     * @param reducer the instance of {@link Reducer}
+     * @param name    the name of the resulted {@link KTable}
      */
     KTable<K, V> reduceByKey(Reducer<V> reducer, String name);
 
     /**
-     * Aggregate values of this stream by key on a window basis.
+     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
      *
-     * @param initializer the class of Initializer
-     * @param aggregator the class of Aggregator
-     * @param windows the specification of the aggregation {@link Windows}
-     * @param <T>   the value type of the aggregated table
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param keySerde      key serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <T>           the value type of the resulted {@link KTable}
      */
     <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
                                                                 Aggregator<K, V, T> aggregator,
@@ -331,25 +341,29 @@ public interface KStream<K, V> {
                                                                 Serde<T> aggValueSerde);
 
     /**
-     * Aggregate values of this stream by key on a window basis.
+     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}
+     * with default serializers and deserializers.
      *
-     * @param initializer   the class of {@link Initializer}
-     * @param aggregator    the class of {@link Aggregator}
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
      * @param windows       the specification of the aggregation {@link Windows}
-     * @param <T>           the value type of the aggregated table
+     * @param <T>           the value type of the resulted {@link KTable}
      */
     <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
                                                                 Aggregator<K, V, T> aggregator,
                                                                 Windows<W> windows);
 
     /**
-     * Aggregate values of this stream by key without a window basis, and hence
-     * return an ever updating table
+     * Aggregate values of this stream by key into a new instance of ever-updating {@link KTable}.
      *
      * @param initializer   the class of {@link Initializer}
      * @param aggregator    the class of {@link Aggregator}
-     * @param name          the name of the aggregated table
-     * @param <T>           the value type of the aggregated table
+     * @param keySerde      key serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param name          the name of the resulted {@link KTable}
+     * @param <T>           the value type of the resulted {@link KTable}
      */
     <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
                                     Aggregator<K, V, T> aggregator,
@@ -358,45 +372,49 @@ public interface KStream<K, V> {
                                     String name);
 
     /**
-     * Aggregate values of this stream by key without a window basis, and hence
-     * return an ever updating table
+     * Aggregate values of this stream by key into a new instance of ever-updating {@link KTable}
+     * with default serializers and deserializers.
      *
      * @param initializer   the class of {@link Initializer}
      * @param aggregator    the class of {@link Aggregator}
-     * @param name          the name of the aggregated table
-     * @param <T>           the value type of the aggregated table
+     * @param name          the name of the resulted {@link KTable}
+     * @param <T>           the value type of the resulted {@link KTable}
      */
     <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
                                     Aggregator<K, V, T> aggregator,
                                     String name);
 
     /**
-     * Count number of messages of this stream by key on a window basis.
+     * Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}.
      *
      * @param windows       the specification of the aggregation {@link Windows}
+     * @param keySerde      key serdes for materializing the counting table,
+     *                      if not specified the default serdes defined in the configs will be used
      */
     <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
 
     /**
-     * Count number of messages of this stream by key on a window basis.
+     * Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}
+     * with default serializers and deserializers.
      *
      * @param windows       the specification of the aggregation {@link Windows}
      */
     <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
 
     /**
-     * Count number of messages of this stream by key without a window basis, and hence
-     * return a ever updating counting table.
+     * Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}.
      *
-     * @param name          the name of the aggregated table
+     * @param keySerde      key serdes for materializing the counting table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param name          the name of the resulted {@link KTable}
      */
     KTable<K, Long> countByKey(Serde<K> keySerde, String name);
 
     /**
-     * Count number of messages of this stream by key without a window basis, and hence
-     * return a ever updating counting table.
+     * Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}
+     * with default serializers and deserializers.
      *
-     * @param name          the name of the aggregated table
+     * @param name          the name of the resulted {@link KTable}
      */
     KTable<K, Long> countByKey(String name);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index dfd9281..6b770b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -28,8 +28,8 @@ import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the {@link KStream} DSL
- * for users to specify computational logic and translates the given logic to a processor topology.
+ * KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
+ * for users to specify computational logic and translates the given logic to a {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}.
  */
 public class KStreamBuilder extends TopologyBuilder {
 
@@ -40,25 +40,23 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Creates a KStream instance for the specified topic.
+     * Creates a {@link KStream} instance from the specified topics.
      * The default deserializers specified in the config are used.
      *
-     * @param topics          the topic names, if empty default to all the topics in the config
-     * @return KStream
+     * @param topics    the topic names; must contain at least one topic name
      */
     public <K, V> KStream<K, V> stream(String... topics) {
         return stream(null, null, topics);
     }
 
     /**
-     * Creates a KStream instance for the specified topic.
+     * Creates a {@link KStream} instance for the specified topics.
      *
-     * @param keySerde key serde used to read this source KStream,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param valSerde value serde used to read this source KStream,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param topics   the topic names, if empty default to all the topics in the config
-     * @return KStream
+     * @param keySerde  key serde used to read this source {@link KStream},
+     *                  if not specified the default serde defined in the configs will be used
+     * @param valSerde  value serde used to read this source {@link KStream},
+     *                  if not specified the default serde defined in the configs will be used
+     * @param topics    the topic names; must contain at least one topic name
      */
     public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String... topics) {
         String name = newName(KStreamImpl.SOURCE_NAME);
@@ -69,25 +67,23 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Creates a KTable instance for the specified topic.
+     * Creates a {@link KTable} instance for the specified topic.
      * The default deserializers specified in the config are used.
      *
-     * @param topic          the topic name
-     * @return KTable
+     * @param topic     the topic name; cannot be null
      */
     public <K, V> KTable<K, V> table(String topic) {
         return table(null, null, topic);
     }
 
     /**
-     * Creates a KTable instance for the specified topic.
+     * Creates a {@link KTable} instance for the specified topic.
      *
      * @param keySerde   key serde used to send key-value pairs,
-     *                        if not specified the default key serde defined in the configuration will be used
+     *                   if not specified the default key serde defined in the configuration will be used
      * @param valSerde   value serde used to send key-value pairs,
-     *                        if not specified the default value serde defined in the configuration will be used
-     * @param topic          the topic name
-     * @return KStream
+     *                   if not specified the default value serde defined in the configuration will be used
+     * @param topic      the topic name; cannot be null
      */
     public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         String source = newName(KStreamImpl.SOURCE_NAME);
@@ -102,10 +98,9 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Creates a new stream by merging the given streams
+     * Creates a new instance of {@link KStream} by merging the given streams
      *
-     * @param streams the streams to be merged
-     * @return KStream
+     * @param streams   the instances of {@link KStream} to be merged
      */
     public <K, V> KStream<K, V> merge(KStream<K, V>... streams) {
         return KStreamImpl.merge(this, streams);
@@ -115,8 +110,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * Create a unique processor name used for translation into the processor topology.
      * This function is only for internal usage.
      *
-     * @param prefix Processor name prefix.
-     * @return The unique processor name.
+     * @param prefix    processor name prefix
      */
     public String newName(String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 997cb4d..9a2a8a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -17,73 +17,72 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 
 /**
- * KTable is an abstraction of a change log stream from a primary-keyed table.
+ * KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
  *
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
  */
+@InterfaceStability.Unstable
 public interface KTable<K, V> {
 
     /**
-     * Creates a new instance of KTable consists of all elements of this stream which satisfy a predicate
+     * Create a new instance of {@link KTable} that consists of all elements of this stream which satisfy a predicate.
      *
-     * @param predicate the instance of Predicate
-     * @return the instance of KTable with only those elements that satisfy the predicate
+     * @param predicate     the instance of {@link Predicate}
      */
     KTable<K, V> filter(Predicate<K, V> predicate);
 
     /**
-     * Creates a new instance of KTable consists all elements of this stream which do not satisfy a predicate
+     * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate
      *
-     * @param predicate the instance of Predicate
-     * @return the instance of KTable with only those elements that do not satisfy the predicate
+     * @param predicate     the instance of {@link Predicate}
      */
     KTable<K, V> filterOut(Predicate<K, V> predicate);
 
     /**
-     * Creates a new instance of KTable by transforming each value in this stream into a different value in the new stream.
+     * Create a new instance of {@link KTable} by transforming the value of each element in this stream into a new value in the new stream.
      *
-     * @param mapper the instance of ValueMapper
-     * @param <V1>   the value type of the new stream
-     * @return the instance of KTable
+     * @param mapper        the instance of {@link ValueMapper}
+     * @param <V1>          the value type of the new stream
      */
     <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
-     * Sends key-value to a topic, also creates a new instance of KTable from the topic.
-     * This is equivalent to calling to(topic) and table(topic).
+     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
+     * using default serializers and deserializers.
+     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
      *
-     * @param topic           the topic name
-     * @return the instance of KTable that consumes the given topic
+     * @param topic         the topic name
      */
     KTable<K, V> through(String topic);
 
     /**
-     * Sends key-value to a topic, also creates a new instance of KTable from the topic.
-     * This is equivalent to calling to(topic) and table(topic).
+     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic.
+     * This is equivalent to calling {@link #to(Serde, Serde, String)} and
+     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration will be used
      * @param valSerde  value serde used to send key-value pairs,
      *                  if not specified the default value serde defined in the configuration will be used
      * @param topic     the topic name
-     * @return the new stream that consumes the given topic
      */
     KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
-     * Sends key-value to a topic using default serializers specified in the config.
+     * Materialize this stream to a topic using default serializers specified in the config.
      *
      * @param topic         the topic name
      */
     void to(String topic);
 
     /**
-     * Sends key-value to a topic.
+     * Materialize this stream to a topic.
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default serde defined in the configs will be used
@@ -94,55 +93,53 @@ public interface KTable<K, V> {
     void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
-     * Creates a new instance of KStream from this KTable
-     *
-     * @return the instance of KStream
+     * Convert this stream to a new instance of {@link KStream}.
      */
     KStream<K, V> toStream();
 
     /**
-     * Combines values of this KTable with another KTable using Inner Join.
+     * Combine values of this stream with another {@link KTable} stream's elements of the same key using Inner Join.
      *
-     * @param other the instance of KTable joined with this stream
-     * @param joiner ValueJoiner
-     * @param <V1>   the value type of the other stream
-     * @param <R>   the value type of the new stream
-     * @return the instance of KTable
+     * @param other         the instance of {@link KTable} joined with this stream
+     * @param joiner        the instance of {@link ValueJoiner}
+     * @param <V1>          the value type of the other stream
+     * @param <R>           the value type of the new stream
      */
     <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
-     * Combines values of this KTable with another KTable using Outer Join.
+     * Combine values of this stream with another {@link KTable} stream's elements of the same key using Outer Join.
      *
-     * @param other the instance of KTable joined with this stream
-     * @param joiner ValueJoiner
-     * @param <V1>   the value type of the other stream
-     * @param <R>   the value type of the new stream
-     * @return the instance of KTable
+     * @param other         the instance of {@link KTable} joined with this stream
+     * @param joiner        the instance of {@link ValueJoiner}
+     * @param <V1>          the value type of the other stream
+     * @param <R>           the value type of the new stream
      */
     <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
-     * Combines values of this KTable with another KTable using Left Join.
+     * Combine values of this stream with another {@link KTable} stream's elements of the same key using Left Join.
      *
-     * @param other the instance of KTable joined with this stream
-     * @param joiner ValueJoiner
-     * @param <V1>   the value type of the other stream
-     * @param <R>   the value type of the new stream
-     * @return the instance of KTable
+     * @param other         the instance of {@link KTable} joined with this stream
+     * @param joiner        the instance of {@link ValueJoiner}
+     * @param <V1>          the value type of the other stream
+     * @param <R>           the value type of the new stream
      */
     <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
-     * Reduce values of this table by the selected key.
+     * Combine updating values of this stream by the selected key into a new instance of {@link KTable}.
      *
-     * @param adder the class of Reducer
-     * @param subtractor the class of Reducer
-     * @param selector the KeyValue mapper that select the aggregate key
-     * @param name the name of the resulted table
-     * @param <K1>   the key type of the aggregated table
-     * @param <V1>   the value type of the aggregated table
-     * @return the instance of KTable
+     * @param adder             the instance of {@link Reducer} for addition
+     * @param subtractor        the instance of {@link Reducer} for subtraction
+     * @param selector          the instance of {@link KeyValueMapper} that select the aggregate key
+     * @param keySerde          key serdes for materializing the aggregated table,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param valueSerde        value serdes for materializing the aggregated table,
+     *                          if not specified the default serdes defined in the configs will be used
+     * @param name              the name of the resulted {@link KTable}
+     * @param <K1>              the key type of the aggregated {@link KTable}
+     * @param <V1>              the value type of the aggregated {@link KTable}
      */
     <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
                                    Reducer<V1> subtractor,
@@ -152,15 +149,15 @@ public interface KTable<K, V> {
                                    String name);
 
     /**
-     * Reduce values of this table by the selected key.
+     * Combine updating values of this stream by the selected key into a new instance of {@link KTable}
+     * using default serializers and deserializers.
      *
      * @param adder         the instance of {@link Reducer} for addition
      * @param subtractor    the instance of {@link Reducer} for subtraction
      * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
-     * @param name          the name of the resulted table
-     * @param <K1>          the key type of the aggregated table
-     * @param <V1>          the value type of the aggregated table
-     * @return              the instance of KTable
+     * @param name          the name of the resulted {@link KTable}
+     * @param <K1>          the key type of the aggregated {@link KTable}
+     * @param <V1>          the value type of the aggregated {@link KTable}
      */
     <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
                                    Reducer<V1> subtractor,
@@ -168,20 +165,26 @@ public interface KTable<K, V> {
                                    String name);
 
     /**
-     * Aggregate values of this table by the selected key.
+     * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}.
      *
-     * @param initializer the class of Initializer
-     * @param add the class of Aggregator
-     * @param remove the class of Aggregator
-     * @param selector the KeyValue mapper that select the aggregate key
-     * @param name the name of the resulted table
-     * @param <K1>   the key type of the aggregated table
-     * @param <V1>   the value type of the aggregated table
-     * @return the instance of KTable
+     * @param initializer   the instance of {@link Initializer}
+     * @param adder         the instance of {@link Aggregator} for addition
+     * @param substractor   the instance of {@link Aggregator} for subtraction
+     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
+     * @param keySerde      key serdes for materializing this stream and the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param valueSerde    value serdes for materializing this stream,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param aggValueSerde value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param name          the name of the resulted table
+     * @param <K1>          the key type of this {@link KTable}
+     * @param <V1>          the value type of this {@link KTable}
+     * @param <T>           the value type of the aggregated {@link KTable}
      */
     <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
-                                        Aggregator<K1, V1, T> add,
-                                        Aggregator<K1, V1, T> remove,
+                                        Aggregator<K1, V1, T> adder,
+                                        Aggregator<K1, V1, T> substractor,
                                         KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
                                         Serde<K1> keySerde,
                                         Serde<V1> valueSerde,
@@ -189,16 +192,17 @@ public interface KTable<K, V> {
                                         String name);
 
     /**
-     * Aggregate values of this table by the selected key.
+     * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}
+     * using default serializers and deserializers.
      *
      * @param initializer   the instance of {@link Initializer}
      * @param adder         the instance of {@link Aggregator} for addition
      * @param substractor   the instance of {@link Aggregator} for subtraction
      * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
-     * @param name          the name of the resulted table
-     * @param <K1>          the key type of the aggregated table
-     * @param <V1>          the value type of the aggregated table
-     * @return              the instance of aggregated {@link KTable}
+     * @param name          the name of the resulted {@link KTable}
+     * @param <K1>          the key type of the aggregated {@link KTable}
+     * @param <V1>          the value type of the aggregated {@link KTable}
+     * @param <T>           the value type of the aggregated {@link KTable}
      */
     <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
                                         Aggregator<K1, V1, T> adder,
@@ -207,12 +211,15 @@ public interface KTable<K, V> {
                                         String name);
 
     /**
-     * Count number of records of this table by the selected key.
+     * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
      *
-     * @param selector the KeyValue mapper that select the aggregate key
-     * @param name the name of the resulted table
-     * @param <K1>   the key type of the aggregated table
-     * @return the instance of KTable
+     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
+     * @param keySerde      key serdes for materializing this stream,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param valueSerde    value serdes for materializing this stream,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param name          the name of the resulted table
+     * @param <K1>          the key type of the aggregated {@link KTable}
      */
     <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
                                 Serde<K1> keySerde,
@@ -220,12 +227,12 @@ public interface KTable<K, V> {
                                 String name);
 
     /**
-     * Count number of records of this table by the selected key.
+     * Count number of records of this stream by the selected key into a new instance of {@link KTable}
+     * using default serializers and deserializers.
      *
      * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
-     * @param name          the name of the resulted table
-     * @param <K1>          the key type of the aggregated table
-     * @return              the instance of aggregated {@link KTable}
+     * @param name          the name of the resulted {@link KTable}
+     * @param <K1>          the key type of the aggregated {@link KTable}
      */
     <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index 9c04ef5..a4aed91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -20,9 +20,9 @@ package org.apache.kafka.streams.kstream;
 /**
  * The KeyValueMapper interface for mapping a key-value pair to a new value (could be another key-value pair).
  *
- * @param <K> Original key type.
- * @param <V> Original value type.
- * @param <R> Mapped value type.
+ * @param <K>   original key type
+ * @param <V>   original value type
+ * @param <R>   mapped value type
  */
 public interface KeyValueMapper<K, V, R> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
index 784f5b1..c90554b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.kstream;
 /**
  * The Predicate interface represents a predicate (boolean-valued function) of a key-value pair.
  *
- * @param <K> Key type.
- * @param <V> Value type.
+ * @param <K>   key type
+ * @param <V>   value type
  */
 public interface Predicate<K, V> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index bf25f73..551a672 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
 /**
  * The Reducer interface for combining two values of the same type into a new value.
  *
- * @param <V> Value type.
+ * @param <V>   value type
  */
 public interface Reducer<V> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 47198e4..8069dca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 /**
  * A stateful Transformer interface for transform a key-value pair into a new value.
  *
- * @param <K> Key type.
- * @param <V> Value type.
- * @param <R> Return type.
+ * @param <K>   key type
+ * @param <V>   value type
+ * @param <R>   return type
  */
 public interface Transformer<K, V, R> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
index 41005b3..5f00a1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -20,9 +20,9 @@ package org.apache.kafka.streams.kstream;
 /**
  * The ValueJoiner interface for joining two values and return a the joined new value.
  *
- * @param <V1> First value type.
- * @param <V2> Second value type.
- * @param <R> Joined value type.
+ * @param <V1>  first value type
+ * @param <V2>  second value type
+ * @param <R>   joined value type
  */
 public interface ValueJoiner<V1, V2, R> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index d507c87..6e62a55 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.kstream;
 /**
  * The KeyValueMapper interface for mapping an original value to a new value (could be another key-value pair).
  *
- * @param <V1> Original value type.
- * @param <V2> Mapped value type.
+ * @param <V1>  original value type
+ * @param <V2>  mapped value type
  */
 public interface ValueMapper<V1, V2> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index b4d2b38..1a0679d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -22,8 +22,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 /**
  * A stateful Value Transformer interface for transform a value into a new value.
  *
- * @param <V> Value type.
- * @param <R> Return type.
+ * @param <V>   value type
+ * @param <R>   return type
  */
 public interface ValueTransformer<V, R> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index eed5fe1..3691282 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -19,9 +19,8 @@ package org.apache.kafka.streams.kstream;
 
 /**
  * The windowed key interface used in {@link KTable}, used for representing a windowed table result from windowed stream aggregations,
- * i.e. {@link KStream#aggregateByKey(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serializer,
- * org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Deserializer,
- * org.apache.kafka.common.serialization.Deserializer)}
+ * i.e. {@link KStream#aggregateByKey(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde,
+ * org.apache.kafka.common.serialization.Serde)}
  *
  * @param <T> Type of the key
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 678e351..e7dc23e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * The window specification interface that can be extended for windowing operation in joins and aggregations.
  *
- * @param <W> Type of the window instance
+ * @param <W>   type of the window instance
  */
 public abstract class Windows<W extends Window> {
 
@@ -38,17 +38,17 @@ public abstract class Windows<W extends Window> {
 
     protected String name;
 
-    private long emitDuration;
+    private long emitDurationMs;
 
-    private long maintainDuration;
+    private long maintainDurationMs;
 
     public int segments;
 
     protected Windows(String name) {
         this.name = name;
         this.segments = DEFAULT_NUM_SEGMENTS;
-        this.emitDuration = DEFAULT_EMIT_DURATION;
-        this.maintainDuration = DEFAULT_MAINTAIN_DURATION;
+        this.emitDurationMs = DEFAULT_EMIT_DURATION;
+        this.maintainDurationMs = DEFAULT_MAINTAIN_DURATION;
     }
 
     public String name() {
@@ -56,29 +56,26 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Set the window emit duration in milliseconds of system time
+     * Set the window emit duration in milliseconds of system time.
      */
-    public Windows emit(long duration) {
-        this.emitDuration = duration;
+    public Windows emit(long durationMs) {
+        this.emitDurationMs = durationMs;
 
         return this;
     }
 
     /**
-     * Set the window maintain duration in milliseconds of system time
+     * Set the window maintain duration in milliseconds of system time.
      */
-    public Windows until(long duration) {
-        this.maintainDuration = duration;
+    public Windows until(long durationMs) {
+        this.maintainDurationMs = durationMs;
 
         return this;
     }
 
     /**
-     * Specifies the number of segments to be used for rolling the window store,
-     * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
-     *
-     * @param segments
-     * @return
+     * Specify the number of segments to be used for rolling the window store,
+     * this function is not exposed to users but can be called by developers that extend this JoinWindows specs.
      */
     protected Windows segments(int segments) {
         this.segments = segments;
@@ -87,11 +84,11 @@ public abstract class Windows<W extends Window> {
     }
 
     public long emitEveryMs() {
-        return this.emitDuration;
+        return this.emitDurationMs;
     }
 
     public long maintainMs() {
-        return this.maintainDuration;
+        return this.maintainDurationMs;
     }
 
     protected String newName(String prefix) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 06681ac..999f6a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -39,11 +39,11 @@ import java.util.Set;
 public class DefaultPartitionGrouper implements PartitionGrouper {
 
     /**
-     * Generate tasks with the assigned topic partitions
+     * Generate tasks with the assigned topic partitions.
      *
-     * @param topicGroups {@link TopologyBuilder#topicGroups()} where topics of the same group need to be joined together
-     * @param metadata Metadata of the consuming cluster
-     * @return The map from generated task ids to the assigned partitions.
+     * @param topicGroups   group of topics that need to be joined together
+     * @param metadata      metadata of the consuming cluster
+     * @return The map from generated task ids to the assigned partitions
      */
     public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) {
         Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index fdcff19..434996e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 
@@ -25,6 +26,7 @@ import java.io.File;
 /**
  * Processor context interface.
  */
+@InterfaceStability.Unstable
 public interface ProcessorContext {
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index 3e7f6fb..908e116 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -19,6 +19,7 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateStore;
 
@@ -30,6 +31,7 @@ import java.util.List;
  * @param <K> The key type
  * @param <V> The value type
  */
+@InterfaceStability.Unstable
 public interface KeyValueStore<K, V> extends StateStore {
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f8b3aed/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index cbd373c..c7a882f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -19,6 +19,7 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.processor.StateStore;
 
 /**
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.processor.StateStore;
  * @param <K> Type of keys
  * @param <V> Type of values
  */
+@InterfaceStability.Unstable
 public interface WindowStore<K, V> extends StateStore {
 
     /**


Mime
View raw message