kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: update JavaDoc for simple helper interfaces of KStream and KTable operators
Date Fri, 06 Jan 2017 16:24:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b7da847f6 -> 00964ec74


MINOR: update JavaDoc for simple helper interfaces of KStream and KTable operators

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2321 from mjsax/javaDocImprovements3


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/00964ec7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/00964ec7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/00964ec7

Branch: refs/heads/trunk
Commit: 00964ec74d0b3124c88958d5c0db4266e4e38f37
Parents: b7da847
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Fri Jan 6 08:24:09 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jan 6 08:24:09 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/ForeachAction.java    | 29 +++++++-----
 .../kafka/streams/kstream/KeyValueMapper.java   | 47 ++++++++++++++------
 .../apache/kafka/streams/kstream/Predicate.java | 25 ++++++++---
 .../kafka/streams/kstream/ValueJoiner.java      | 38 +++++++++++-----
 .../kafka/streams/kstream/ValueMapper.java      | 27 +++++++----
 5 files changed, 116 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/00964ec7/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
index 4c773c9..59f6fab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -5,35 +5,40 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * The {@link ForeachAction} interface for performing an action on a key-value pair.
- * Note that this action is stateless. If stateful processing is required, consider
- * using {@link KStream#transform(TransformerSupplier, String...)} or
- * {@link KStream#process(org.apache.kafka.streams.processor.ProcessorSupplier, String...)}
instead.
+ * The {@link ForeachAction} interface for performing an action on a {@link org.apache.kafka.streams.KeyValue
key-value
+ * pair}.
+ * This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is
invoked individually for each
+ * record of a stream.
+ * If stateful processing is required, consider using
+ * {@link KStream#process(org.apache.kafka.streams.processor.ProcessorSupplier, String...)
KStream#process(...)}.
  *
- * @param <K>   original key type
- * @param <V>   original value type
+ * @param <K> key type
+ * @param <V> value type
+ * @see KStream#foreach(ForeachAction)
+ * @see KTable#foreach(ForeachAction)
  */
+@InterfaceStability.Unstable
 public interface ForeachAction<K, V> {
 
     /**
      * Perform an action for each record of a stream.
      *
-     * @param key    the key of the record
-     * @param value  the value of the record
+     * @param key   the key of the record
+     * @param value the value of the record
      */
     void apply(K key, V value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/00964ec7/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index b36ed63..6fcf61c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -5,33 +5,54 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 /**
- * The {@link KeyValueMapper} interface for mapping a key-value pair to a new value (could
be another key-value pair).
+ * The {@link KeyValueMapper} interface for mapping a {@link org.apache.kafka.streams.KeyValue
key-value pair} to a new
+ * value of arbitrary type. For example, it can be used to
+ * <ul>
+ * <li>map from an input {@link org.apache.kafka.streams.KeyValue key-value pair} to
an output
+ * {@link org.apache.kafka.streams.KeyValue key-value pair} with different key and/or value
type (for this case
+ * output type {@code VR == }{@link org.apache.kafka.streams.KeyValue KeyValue&lt;NewKeyType,NewValueType&gt;})</li>
+ * <li>map from an input record to a new key (with arbitrary key type as specified
by {@code VR})</li>
+ * </ul>
+ * This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is
invoked individually for each
+ * record of a stream.
+ * {@link KeyValueMapper} is a generalization of {@link ValueMapper}.
  *
- * @param <K>   original key type
- * @param <V>   original value type
- * @param <R>   mapped value type
+ * @param <K>  key type
+ * @param <V>  value type
+ * @param <VR> mapped value type
+ * @see ValueMapper
+ * @see KStream#map(KeyValueMapper)
+ * @see KStream#flatMap(KeyValueMapper)
+ * @see KStream#selectKey(KeyValueMapper)
+ * @see KStream#groupBy(KeyValueMapper)
+ * @see KStream#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KTable#groupBy(KeyValueMapper)
+ * @see KTable#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serdes, org.apache.kafka.common.serialization.Serde)
+ * @see KTable#toStream(KeyValueMapper)
  */
-public interface KeyValueMapper<K, V, R> {
+@InterfaceStability.Unstable
+public interface KeyValueMapper<K, V, VR> {
 
     /**
      * Map a record with the given key and value to a new value.
      *
-     * @param key    the key of the record
-     * @param value  the value of the record
-     * @return       the new value
+     * @param key   the key of the record
+     * @param value the value of the record
+     * @return the new value
      */
-    R apply(K key, V value);
+    VR apply(K key, V value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/00964ec7/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
index 2df2d5f..b46e60f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -14,23 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 /**
- * The {@link Predicate} interface represents a predicate (boolean-valued function) of a
key-value pair.
+ * The {@link Predicate} interface represents a predicate (boolean-valued function) of a
+ * {@link org.apache.kafka.streams.KeyValue key-value pair}.
+ * This is a stateless record-by-record operation, i.e, {@link #test(Object, Object)} is
invoked individually for each
+ * record of a stream.
  *
- * @param <K>   key type
- * @param <V>   value type
+ * @param <K> key type
+ * @param <V> value type
+ * @see KStream#filter(Predicate)
+ * @see KStream#filterNot(Predicate)
+ * @see KStream#branch(Predicate[])
+ * @see KTable#filter(Predicate)
+ * @see KTable#filterNot(Predicate)
  */
+@InterfaceStability.Unstable
 public interface Predicate<K, V> {
 
     /**
      * Test if the record with the given key and value satisfies the predicate.
      *
-     * @param key    the key of the record
-     * @param value  the value of the record
-     * @return       return {@code true} if the key-value pair satisfies the predicate&mdash;{@code
false} otherwise
+     * @param key   the key of the record
+     * @param value the value of the record
+     * @return {@code true} if the {@link org.apache.kafka.streams.KeyValue key-value pair}
satisfies the
+     * predicate&mdash;{@code false} otherwise
      */
     boolean test(K key, V value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/00964ec7/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
index 8d4a8e7..e51889b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -14,24 +14,42 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 /**
- * The {@link ValueJoiner} interface for joining two values into a new value.
+ * The {@link ValueJoiner} interface for joining two values into a new value of arbitrary
type.
+ * This is a stateless operation, i.e, {@link #apply(Object, Object)} is invoked individually
for each joining
+ * record-pair of a {@link KStream}-{@link KStream}, {@link KStream}-{@link KTable}, or {@link
KTable}-{@link KTable}
+ * join.
  *
- * @param <V1>  first value type
- * @param <V2>  second value type
- * @param <R>   joined value type
+ * @param <V1> first value type
+ * @param <V2> second value type
+ * @param <VR> joined value type
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde,
org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde,
org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde,
org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#join(KTable, ValueJoiner)
+ * @see KStream#join(KTable, ValueJoiner, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#leftJoin(KTable, ValueJoiner)
+ * @see KStream#leftJoin(KTable, ValueJoiner, org.apache.kafka.common.serialization.Serde,
org.apache.kafka.common.serialization.Serde)
+ * @see KTable#join(KTable, ValueJoiner)
+ * @see KTable#leftJoin(KTable, ValueJoiner)
+ * @see KTable#outerJoin(KTable, ValueJoiner)
  */
-public interface ValueJoiner<V1, V2, R> {
+@InterfaceStability.Unstable
+public interface ValueJoiner<V1, V2, VR> {
 
     /**
      * Return a joined value consisting of {@code value1} and {@code value2}.
      *
-     * @param value1  the first value for joining
-     * @param value2  the second value for joining
-     * @return        the joined value
+     * @param value1 the first value for joining
+     * @param value2 the second value for joining
+     * @return the joined value
      */
-    R apply(V1 value1, V2 value2);
+    VR apply(V1 value1, V2 value2);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/00964ec7/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index e168e37..7d1a096 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -14,22 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 /**
- * The {@link ValueMapper} interface for mapping an original value to a new value (could
be another key-value pair).
+ * The {@link ValueMapper} interface for mapping a value to a new value of arbitrary type.
+ * This is a stateless record-by-record operation, i.e, {@link #apply(Object)} is invoked
individually for each record
+ * of a stream.
+ * Thus, if {@link ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue
key-value pair} record the
+ * record's key is preserved.
+ * If a record's key and value should be modified {@link KeyValueMapper} can be used.
  *
- * @param <V1>  original value type
- * @param <V2>  mapped value type
+ * @param <V>  value type
+ * @param <VR> mapped value type
+ * @see KeyValueMapper
+ * @see KStream#mapValues(ValueMapper)
+ * @see KStream#flatMapValues(ValueMapper)
+ * @see KTable#mapValues(ValueMapper)
  */
-public interface ValueMapper<V1, V2> {
+@InterfaceStability.Unstable
+public interface ValueMapper<V, VR> {
 
     /**
      * Map the given value to a new value.
      *
-     * @param value  the value to be mapped
-     * @return       the new value
+     * @param value the value to be mapped
+     * @return the new value
      */
-    V2 apply(V1 value);
+    VR apply(V value);
 }


Mime
View raw message