kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (#6269)
Date Mon, 18 Feb 2019 19:22:52 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9fe89f3  MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (#6269)
9fe89f3 is described below

commit 9fe89f357ced1f75d5c7053979e8ec6430041885
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Mon Feb 18 11:22:41 2019 -0800

    MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (#6269)
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 59 +++++++---------------
 1 file changed, 19 insertions(+), 40 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 44778f0..5138917 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -95,7 +95,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * Setting a new key might result in an internal data redistribution if a key based operator
(like an aggregation or
      * join) is applied to the result {@code KStream}.
      *
@@ -128,7 +127,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not
return {@code null}.
      * <p>
      * Mapping records might result in an internal data redistribution if a key based operator
(like an aggregation or
@@ -166,7 +164,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key based
operator (like an aggregation or join)
      * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
@@ -201,7 +198,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * Note that the key is read-only and should not be modified, as this can lead to corrupt
partitioning.
      * So, setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key based
operator (like an aggregation or join)
@@ -246,7 +242,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link
java.util.Collection} type)
      * and the return value must not be {@code null}.
      * <p>
@@ -289,7 +284,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link
java.util.Collection} type)
      * and the return value must not be {@code null}.
      * <p>
@@ -336,7 +330,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any
{@link java.util.Collection} type)
      * and the return value must not be {@code null}.
      * <p>
@@ -503,7 +496,6 @@ public interface KStream<K, V> {
      * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}).
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)},
the processing progress
      * can be observed and additional periodic actions can be performed.
-     *
      * <p>
      * In order to assign a state, the state must be created and registered beforehand:
      * <pre>{@code
@@ -517,7 +509,6 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
      * }</pre>
-     * <p>
      * Within the {@link Transformer}, the state is obtained via the {@link  ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()},
      * a schedule must be registered.
@@ -549,12 +540,12 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String)} should be performed
before {@code transform()}.
      * <p>
      * Transforming records might result in an internal data redistribution if a key based
operator (like an aggregation
      * or join) is applied to the result {@code KStream}.
      * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
-     * </p>
-     *
      * <p>
      * Note that it is possible to emit multiple records for each input record by using
      * {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(K,
V)}.
@@ -563,7 +554,6 @@ public interface KStream<K, V> {
      * To ensure type-safety at compile-time, it is recommended to use
      * {@link #flatTransform(TransformerSupplier, String...)} if multiple records need to
be emitted for each input
      * record.
-     * </p>
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} that generates
a {@link Transformer}
      * @param stateStoreNames     the names of the state stores used by the processor
@@ -589,7 +579,6 @@ public interface KStream<K, V> {
      * transformation).
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress
      * can be observed and additional periodic actions can be performed.
-     *
      * <p>
      * In order to assign a state, the state must be created and registered beforehand:
      * <pre>{@code
@@ -603,7 +592,6 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { ... },
"myTransformState");
      * }</pre>
-     * <p>
      * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()},
      * a schedule must be registered.
@@ -637,6 +625,8 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String)} should be performed
before {@code transform()}.
      * <p>
      * Transforming records might result in an internal data redistribution if a key based
operator (like an aggregation
      * or join) is applied to the result {@code KStream}.
@@ -677,7 +667,6 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier()
{ ... }, "myValueTransformState");
      * }</pre>
-     * <p>
      * Within the {@link ValueTransformer}, the state is obtained via the
      * {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()}, a schedule must be
@@ -708,6 +697,8 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String)} should be performed
before {@code transform()}.
      * <p>
      * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key based
operator (like an aggregation or join)
@@ -746,7 +737,6 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier()
{ ... }, "myValueTransformState");
      * }</pre>
-     * <p>
      * Within the {@link ValueTransformerWithKey}, the state is obtained via the
      * {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()},
@@ -777,6 +767,8 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String)} should be performed
before {@code transform()}.
      * <p>
      * Note that the key is read-only and should not be modified, as this can lead to corrupt
partitioning.
      * So, setting a new value preserves data co-location with respect to the key.
@@ -815,7 +807,6 @@ public interface KStream<K, V> {
      *
      * inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState");
      * }</pre>
-     * <p>
      * Within the {@link Processor}, the state is obtained via the
      * {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()},
@@ -842,6 +833,8 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String)} should be performed
before {@code transform()}.
      *
      * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a
{@link Processor}
      * @param stateStoreNames   the names of the state store used by the processor
@@ -868,7 +861,6 @@ public interface KStream<K, V> {
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * For this case, all data of this stream will be redistributed through the repartitioning
topic by writing all
      * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned
@@ -898,7 +890,6 @@ public interface KStream<K, V> {
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * For this case, all data of this stream will be redistributed through the repartitioning
topic by writing all
      * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned
@@ -928,10 +919,8 @@ public interface KStream<K, V> {
      * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
&lt;name&gt; is
      * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or
an internally generated name,
      * and "-repartition" is a fixed suffix.
-     *
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * For this case, all data of this stream will be redistributed through the repartitioning
topic by writing all
      * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned
@@ -959,7 +948,6 @@ public interface KStream<K, V> {
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * All data of this stream will be redistributed through the repartitioning topic by
writing all records to it,
      * and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned on the new key.
@@ -988,7 +976,6 @@ public interface KStream<K, V> {
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * All data of this stream will be redistributed through the repartitioning topic by
writing all records to it,
      * and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned on the new key.
@@ -1020,7 +1007,6 @@ public interface KStream<K, V> {
      * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or
an internally generated name.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * All data of this stream will be redistributed through the repartitioning topic by
writing all records to it,
      * and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned on the new key.
@@ -1036,8 +1022,6 @@ public interface KStream<K, V> {
     <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super
V, KR> selector,
                                        final Grouped<KR, V> grouped);
 
-
-
     /**
      * Join records of this stream with another {@code KStream}'s records using windowed
inner equi join with default
      * serializers and deserializers.
@@ -1085,7 +1069,6 @@ public interface KStream<K, V> {
      * user-specified in {@link  StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1175,7 +1158,7 @@ public interface KStream<K, V> {
      * in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
an
      * internally generated name, and "-changelog" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1257,7 +1240,7 @@ public interface KStream<K, V> {
      * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId"
is user-specified
      * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG
APPLICATION_ID_CONFIG},
      * "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1338,7 +1321,7 @@ public interface KStream<K, V> {
      * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId"
is user-specified
      * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG
APPLICATION_ID_CONFIG},
      * "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1422,7 +1405,7 @@ public interface KStream<K, V> {
      * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId"
is user-specified
      * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG
APPLICATION_ID_CONFIG},
      * "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1504,7 +1487,7 @@ public interface KStream<K, V> {
      * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId"
is user-specified
      * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG
APPLICATION_ID_CONFIG},
      * "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1580,9 +1563,8 @@ public interface KStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * Repartitioning can happen only for this {@code KStream} but not for the provided {@link
KTable}.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1656,9 +1638,8 @@ public interface KStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * Repartitioning can happen only for this {@code KStream} but not for the provided {@link
KTable}.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1738,9 +1719,8 @@ public interface KStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * Repartitioning can happen only for this {@code KStream} but not for the provided {@link
KTable}.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1817,9 +1797,8 @@ public interface KStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
-     *
      * <p>
      * Repartitioning can happen only for this {@code KStream} but not for the provided {@link
KTable}.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all


Mime
View raw message