kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Some more Kafka Streams Javadocs
Date Wed, 03 Feb 2016 19:35:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1d80f563b -> 79eacf6c9


MINOR: Some more Kafka Streams Javadocs

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Matsuda <yasuhiro@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #853 from guozhangwang/KJavaDoc


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79eacf6c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79eacf6c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79eacf6c

Branch: refs/heads/trunk
Commit: 79eacf6c95506d5d6819add5a1256681b13170b1
Parents: 1d80f56
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Feb 3 11:31:32 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Feb 3 11:31:32 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 35 ++++++++++++++------
 .../java/org/apache/kafka/streams/KeyValue.java |  8 +++++
 .../apache/kafka/streams/StreamsMetrics.java    |  3 ++
 .../streams/errors/ProcessorStateException.java |  3 ++
 .../streams/errors/TaskAssignmentException.java |  2 +-
 .../streams/errors/TaskIdFormatException.java   |  3 ++
 .../errors/TopologyBuilderException.java        |  3 ++
 .../kafka/streams/kstream/Aggregator.java       |  7 ++++
 .../kafka/streams/kstream/HoppingWindows.java   |  3 ++
 .../kafka/streams/kstream/Initializer.java      |  5 +++
 .../kafka/streams/kstream/JoinWindows.java      |  2 +-
 .../apache/kafka/streams/kstream/KStream.java   |  2 +-
 .../kafka/streams/kstream/KStreamBuilder.java   | 11 +++++-
 .../apache/kafka/streams/kstream/KTable.java    |  1 +
 .../kafka/streams/kstream/KeyValueMapper.java   |  7 ++++
 .../apache/kafka/streams/kstream/Predicate.java |  7 ++--
 .../apache/kafka/streams/kstream/Reducer.java   |  5 +++
 .../kafka/streams/kstream/TumblingWindows.java  |  4 ++-
 .../kafka/streams/kstream/ValueJoiner.java      |  7 ++++
 .../kafka/streams/kstream/ValueMapper.java      |  6 ++++
 .../kstream/ValueTransformerSupplier.java       |  1 -
 .../apache/kafka/streams/kstream/Window.java    |  3 ++
 .../apache/kafka/streams/kstream/Windows.java   |  5 +++
 .../processor/DefaultPartitionGrouper.java      | 12 +++++--
 .../kafka/streams/processor/Processor.java      |  2 +-
 .../streams/processor/ProcessorContext.java     |  3 ++
 .../streams/processor/StateStoreSupplier.java   |  3 ++
 .../apache/kafka/streams/processor/TaskId.java  |  3 ++
 .../kafka/streams/state/KeyValueIterator.java   |  6 ++++
 .../apache/kafka/streams/state/WindowStore.java | 17 ++++++++++
 30 files changed, 155 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 071cef6..a19f697 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -38,15 +39,18 @@ import java.util.concurrent.atomic.AtomicInteger;
  * Kafka Streams allows for performing continuous computation on input coming from one or
more input topics and
  * sends output to zero or more output topics.
  * <p>
- * This processing is defined by using the {@link TopologyBuilder} class or its superclass
KStreamBuilder to specify
- * the transformation.
- * The {@link KafkaStreams} instance will be responsible for the lifecycle of these processors.
It will instantiate and
- * start one or more of these processors to process the Kafka partitions assigned to this
particular instance.
+ * The computational logic can be specified either by using the {@link TopologyBuilder} class
to define the a DAG topology of
+ * {@link org.apache.kafka.streams.processor.Processor}s or by using the {@link org.apache.kafka.streams.kstream.KStreamBuilder}
+ * class which provides the high-level {@link org.apache.kafka.streams.kstream.KStream} DSL
to define the transformation.
+ *
+ * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One
stream instance can contain one or
+ * more threads specified in the configs for the processing work.
  * <p>
- * This {@link KafkaStreams} instance will co-ordinate with any other instances (whether
in this same process, on other processes
- * on this machine, or on remote machines). These processes will divide up the work so that
all partitions are being
- * consumed. If instances are added or die, the corresponding {@link StreamThread} instances
will be shutdown or
- * started in the appropriate processes to balance processing load.
+ * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same
job ID (whether in this same process, on other processes
+ * on this machine, or on remote machines) as a single (possibly distributed) stream processing
client. These instances will divide up the work
+ * based on the assignment of the input topic partitions so that all partitions are being
+ * consumed. If instances are added or failed, all instances will rebelance the partition
assignment among themselves
+ * to balance processing load.
  * <p>
  * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer
KafkaProducer}
  * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that
is used for reading input and writing output.
@@ -70,6 +74,9 @@ import java.util.concurrent.atomic.AtomicInteger;
  * </pre>
  *
  */
+// TODO: about example may need to be updated after KAFKA-3153
+
+@InterfaceStability.Unstable
 public class KafkaStreams {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
@@ -94,6 +101,12 @@ public class KafkaStreams {
         this(builder, new StreamsConfig(props));
     }
 
+    /**
+     * Construct the stream instance.
+     *
+     * @param builder The processor topology builder specifying the computational logic
+     * @param config The stream configs
+     */
     public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
         // create the metrics
         Time time = new SystemTime();
@@ -124,7 +137,7 @@ public class KafkaStreams {
     }
 
     /**
-     * Start the stream process by starting all its threads
+     * Start the stream instance by starting all its threads.
      */
     public synchronized void start() {
         log.debug("Starting Kafka Stream process");
@@ -142,8 +155,8 @@ public class KafkaStreams {
     }
 
     /**
-     * Shutdown this stream process by signaling the threads to stop,
-     * wait for them to join and clean up the process instance.
+     * Shutdown this stream instance by signaling all the threads to stop,
+     * and then wait for them to join.
      */
     public synchronized void close() {
         log.debug("Stopping Kafka Stream process");

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index 472e677..d813c47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -17,6 +17,14 @@
 
 package org.apache.kafka.streams;
 
+/**
+ * A key-value pair defined for a single Kafka Streams record.
+ * If the record comes directly from a Kafka topic then its
+ * key / value are defined as the message key / value.
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ */
 public class KeyValue<K, V> {
 
     public final K key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
index a151392..d392eef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.common.metrics.Sensor;
 
+/**
+ * The stream metrics interface for adding metric sensors and collecting metric values.
+ */
 public interface StreamsMetrics {
 
     Sensor addLatencySensor(String scopeName, String entityName, String operationName, String...
tags);

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
index 6434d04..e6f872a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.streams.errors;
 
+/**
+ * Indicates a processor state operation (e.g. put, get) has failed.
+ */
 public class ProcessorStateException extends StreamsException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
index 3ae8503..3936ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.errors;
 
 /**
- * The run time exception class for stream task assignments
+ * Indicates a run time error incurred while trying to assign stream tasks to threads
  */
 public class TaskAssignmentException extends StreamsException {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
index bf0ebf5..576b972 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.streams.errors;
 
+/**
+ * Indicates a run time error incurred while trying parse the task id from the read string
+ */
 public class TaskIdFormatException extends StreamsException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
index 9dd740b..8745693 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.streams.errors;
 
+/**
+ * Indicates a pre-run time error incurred while parsing the builder to construct the processor
topology
+ */
 public class TopologyBuilderException extends StreamsException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index e3eb18f..0d29409 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -17,6 +17,13 @@
 
 package org.apache.kafka.streams.kstream;
 
+/**
+ * The Aggregator interface for aggregating values of the given key.
+ *
+ * @param <K> Key type.
+ * @param <V> Receiving value type.
+ * @param <T> Aggregate value type.
+ */
 public interface Aggregator<K, V, T> {
 
     T apply(K aggKey, V value, T aggregate);

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
index f354ef9..aa866e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
@@ -22,6 +22,9 @@ import org.apache.kafka.streams.kstream.internals.HoppingWindow;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * The hopping window specifications used for aggregations.
+ */
 public class HoppingWindows extends Windows<HoppingWindow> {
 
     private static final long DEFAULT_SIZE_MS = 1000L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index 0aeddc9..fdd5220 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.streams.kstream;
 
+/**
+ * The Initializer interface for creating an initial value for aggregations.
+ *
+ * @param <T> Aggregate value type.
+ */
 public interface Initializer<T> {
 
     T apply();

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index ffc1c1c..70294a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.TumblingWindow;
 import java.util.Map;
 
 /**
- * This class is used to specify the behaviour of windowed joins.
+ * The window specifications used for joins.
  */
 public class JoinWindows extends Windows<TumblingWindow> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index f6fa48d..b83298f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 
-
+// TODO: Javadoc needs to be updated
 /**
  * KStream is an abstraction of a stream of key-value pairs.
  *

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index b50cffb..3cf198c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -29,7 +29,8 @@ import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * KStreamBuilder is the class to create KStream instances.
+ * KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the {@link KStream}
DSL
+ * for users to specify computational logic and translates the given logic to a processor
topology.
  */
 public class KStreamBuilder extends TopologyBuilder {
 
@@ -39,6 +40,7 @@ public class KStreamBuilder extends TopologyBuilder {
         super();
     }
 
+    // TODO: needs updated
     /**
      * Creates a KStream instance for the specified topic.
      * The default deserializers specified in the config are used.
@@ -115,6 +117,13 @@ public class KStreamBuilder extends TopologyBuilder {
         return KStreamImpl.merge(this, streams);
     }
 
+    /**
+     * Create a unique processor name used for translation into the processor topology.
+     * This function is only for internal usage.
+     *
+     * @param prefix Processor name prefix.
+     * @return The unique processor name.
+     */
     public String newName(String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 5cd9d9c..b83b0de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
 
+// TODO: Javadoc needs to be updated.
 /**
  * KTable is an abstraction of a change log stream.
  *

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index 62b07f6..9c04ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -17,6 +17,13 @@
 
 package org.apache.kafka.streams.kstream;
 
+/**
+ * The KeyValueMapper interface for mapping a key-value pair to a new value (could be another
key-value pair).
+ *
+ * @param <K> Original key type.
+ * @param <V> Original value type.
+ * @param <R> Mapped value type.
+ */
 public interface KeyValueMapper<K, V, R> {
 
     R apply(K key, V value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
index c73622e..784f5b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -18,13 +18,12 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * Represents a predicate (boolean-valued function) of two arguments.
+ * The Predicate interface represents a predicate (boolean-valued function) of a key-value
pair.
  *
- * @param <K> the type of key
- * @param <V> the type of value
+ * @param <K> Key type.
+ * @param <V> Value type.
  */
 public interface Predicate<K, V> {
 
     boolean test(K key, V value);
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index 418f442..bf25f73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.streams.kstream;
 
+/**
+ * The Reducer interface for combining two values of the same type into a new value.
+ *
+ * @param <V> Value type.
+ */
 public interface Reducer<V> {
 
     V apply(V value1, V value2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
index 188fe66..cadedba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
@@ -17,12 +17,14 @@
 
 package org.apache.kafka.streams.kstream;
 
-
 import org.apache.kafka.streams.kstream.internals.TumblingWindow;
 
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * The tumbling window specifications used for aggregations.
+ */
 public class TumblingWindows extends Windows<TumblingWindow> {
 
     private static final long DEFAULT_SIZE_MS = 1000L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
index 93fc359..41005b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -17,6 +17,13 @@
 
 package org.apache.kafka.streams.kstream;
 
+/**
+ * The ValueJoiner interface for joining two values and return a the joined new value.
+ *
+ * @param <V1> First value type.
+ * @param <V2> Second value type.
+ * @param <R> Joined value type.
+ */
 public interface ValueJoiner<V1, V2, R> {
 
     R apply(V1 value1, V2 value2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index a32423d..d507c87 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -17,6 +17,12 @@
 
 package org.apache.kafka.streams.kstream;
 
+/**
+ * The KeyValueMapper interface for mapping an original value to a new value (could be another
key-value pair).
+ *
+ * @param <V1> Original value type.
+ * @param <V2> Mapped value type.
+ */
 public interface ValueMapper<V1, V2> {
 
     V2 apply(V1 value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index 5c053c7..04fa9eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -20,5 +20,4 @@ package org.apache.kafka.streams.kstream;
 public interface ValueTransformerSupplier<V, R> {
 
     ValueTransformer<V, R> get();
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index b9401b0..f2965dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.streams.kstream;
 
+/**
+ * A single window instance, defined by its start and end timestamp.
+ */
 public abstract class Window {
 
     private long start;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index e4d7d9d..678e351 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -21,6 +21,11 @@ package org.apache.kafka.streams.kstream;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * The window specification interface that can be extended for windowing operation in joins
and aggregations.
+ *
+ * @param <W> Type of the window instance
+ */
 public abstract class Windows<W extends Window> {
 
     private static final int DEFAULT_NUM_SEGMENTS = 3;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 57df685..dad5c6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -31,12 +31,20 @@ import java.util.Set;
 
 /**
  * DefaultPartitionGrouper groups partitions by the partition id. This behavior is assumed
by the join processing in KStream.
- * Join processing requires that topics are copartitoned, i.e., being partitioned by the
same key and having the same
- * number of partitions, are grouped together. Copartitioning is ensured by having the same
number of partitions on
+ *
+ * Join operations requires that topics of the joining entities are copartitoned, i.e., being
partitioned by the same key and having the same
+ * number of partitions. Copartitioning is ensured by having the same number of partitions
on
  * joined topics, and by using the serialization and Producer's default partitioner.
  */
 public class DefaultPartitionGrouper implements PartitionGrouper {
 
+    /**
+     * Generate tasks with the assigned topic partitions
+     *
+     * @param topicGroups {@link TopologyBuilder#topicGroups()} where topics of the same
group need to be joined together
+     * @param metadata Metadata of the consuming cluster
+     * @return The map from generated task ids to the assigned partitions.
+     */
     public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>>
topicGroups, Cluster metadata) {
         Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index 3cade3a..fbd72f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.processor;
 
 /**
- * A processor of messages.
+ * A processor of key-value pair records.
  *
  * @param <K> the type of keys
  * @param <V> the type of values

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index af98300..9740fa3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.StreamsMetrics;
 
 import java.io.File;
 
+/**
+ * Processor context interface.
+ */
 public interface ProcessorContext {
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index 11545c5..993500d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.streams.processor;
 
+/**
+ * A state store supplier which can create one or more {@link StateStore} instances.
+ */
 public interface StateStoreSupplier {
 
     String name();

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 6e7150e..69b29bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -24,6 +24,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+/**
+ * The task id representation composed as topic group id plus the assigned partition id.
+ */
 public class TaskId implements Comparable<TaskId> {
 
     public final int topicGroupId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index bd118a2..55ec8cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -24,6 +24,12 @@ import org.apache.kafka.streams.KeyValue;
 import java.io.Closeable;
 import java.util.Iterator;
 
+/**
+ * Iterator interface of {@link KeyValue}.
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ */
 public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>,
Closeable {
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 1d806e0..cbd373c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -21,11 +21,28 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.streams.processor.StateStore;
 
+/**
+ * A windowed store interface extending {@link StateStore}
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ */
 public interface WindowStore<K, V> extends StateStore {
 
+    /**
+     * Put a key-value pair with the current wall-clock time as the timestamp
+     * into the corresponding window
+     */
     void put(K key, V value);
 
+    /**
+     * Put a key-value pair with the given timestamp into the corresponding window
+     */
     void put(K key, V value, long timestamp);
 
+    /**
+     * Get all the key-value pairs with the given key and the time range from all
+     * the existing windows.
+     */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 }


Mime
View raw message