kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: MINOR: improve JavaDocs about global state stores (#6359)
Date Thu, 07 Mar 2019 02:19:57 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 240d758  MINOR: improve JavaDocs about global state stores (#6359)
240d758 is described below

commit 240d7589d624b72fa95e2f84a84778bc3a127927
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Wed Mar 6 18:19:47 2019 -0800

    MINOR: improve JavaDocs about global state stores (#6359)
    
    Improve JavaDocs about global state stores.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Sophie Blee-Goldman <sophie@confluent.io>,
 Bill Bejeck <bbejeck@gmail.com>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |   9 ++
 .../org/apache/kafka/streams/kstream/KStream.java  | 142 +++++++++++----------
 .../kafka/streams/scala/StreamsBuilder.scala       |  10 +-
 .../kafka/streams/scala/kstream/KStream.scala      |  25 ++--
 4 files changed, 110 insertions(+), 76 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 1b3b4a2..9e89d7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -25,9 +25,12 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -447,6 +450,9 @@ public class StreamsBuilder {
 
     /**
      * Adds a state store to the underlying {@link Topology}.
+     * <p>
+     * It is required to connect state stores to {@link Processor Processors}, {@link Transformer
Transformers},
+     * or {@link ValueTransformer ValueTransformers} before they can be used.
      *
      * @param builder the builder used to obtain this state store {@link StateStore} instance
      * @return itself
@@ -492,6 +498,9 @@ public class StreamsBuilder {
      * records forwarded from the {@link SourceNode}.
      * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config}
is used.
+     * <p>
+     * It is not required to connect a global store to {@link Processor Processors}, {@link
Transformer Transformers},
+     * or {@link ValueTransformer ValueTransformer}; those have read-only access to all global
stores by default.
      *
      * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
      * @param topic                 the topic to source the data from
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 5138917..df001d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -229,18 +229,19 @@ public interface KStream<K, V> {
      * and emit a record {@code <word:1>} for each word.
      * <pre>{@code
      * KStream<byte[], String> inputStream = builder.stream("topic");
-     * KStream<String, Integer> outputStream = inputStream.flatMap(new KeyValueMapper<byte[],
String, Iterable<KeyValue<String, Integer>>> {
-     *     Iterable<KeyValue<String, Integer>> apply(byte[] key, String value)
{
-     *         String[] tokens = value.split(" ");
-     *         List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
+     * KStream<String, Integer> outputStream = inputStream.flatMap(
+     *     new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>>
{
+     *         Iterable<KeyValue<String, Integer>> apply(byte[] key, String value)
{
+     *             String[] tokens = value.split(" ");
+     *             List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
+     *
+     *             for(String token : tokens) {
+     *                 result.add(new KeyValue<>(token, 1));
+     *             }
      *
-     *         for(String token : tokens) {
-     *             result.add(new KeyValue<>(token, 1));
+     *             return result;
      *         }
-     *
-     *         return result;
-     *     }
-     * });
+     *     });
      * }</pre>
      * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link
java.util.Collection} type)
      * and the return value must not be {@code null}.
@@ -497,7 +498,8 @@ public interface KStream<K, V> {
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)},
the processing progress
      * can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand:
+     * In order to assign a state, the state must be created and registered beforehand (it's
not required to connect
+     * global state stores; read-only access to global state stores is available by default):
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -580,7 +582,8 @@ public interface KStream<K, V> {
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress
      * can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand:
+     * In order to assign a state, the state must be created and registered beforehand (it's
not required to connect
+     * global state stores; read-only access to global state stores is available by default):
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -652,10 +655,11 @@ public interface KStream<K, V> {
      * record value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output record
{@code <K:V'>}.
      * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress can be observed and additional
-     * periodic actions can be performed.
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress
+     * can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand:
+     * In order to assign a state, the state must be created and registered beforehand (it's
not required to connect
+     * global state stores; read-only access to global state stores is available by default):
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -669,8 +673,8 @@ public interface KStream<K, V> {
      * }</pre>
      * Within the {@link ValueTransformer}, the state is obtained via the
      * {@link ProcessorContext}.
-     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()}, a schedule must be
-     * registered.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()},
+     * a schedule must be registered.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no
additional {@link KeyValue}
      * pairs should be emitted via {@link ProcessorContext#forward(Object, Object)
      * ProcessorContext.forward()}.
@@ -682,7 +686,8 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myValueTransformState");
-     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..)); // punctuate each 1000ms, can access this.state
+     *                 // punctuate each 1000ms, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..));
      *             }
      *
      *             NewValueType transform(V value) {
@@ -718,14 +723,15 @@ public interface KStream<K, V> {
 
     /**
      * Transform the value of each input record into a new value (with possible new type)
of the output record.
-     * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier})
is applied to each input
-     * record value and computes a new value for it.
+     * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier})
is applied to
+     * each input record value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output record
{@code <K:V'>}.
      * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress can be observed and additional
-     * periodic actions can be performed.
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress
+     * can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand:
+     * In order to assign a state, the state must be created and registered beforehand (it's
not required to connect
+     * global state stores; read-only access to global state stores is available by default):
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -752,7 +758,8 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myValueTransformState");
-     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..)); // punctuate each 1000ms, can access this.state
+     *                 // punctuate each 1000ms, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..));
      *             }
      *
      *             NewValueType transform(K readOnlyKey, V value) {
@@ -791,11 +798,12 @@ public interface KStream<K, V> {
      * Process all records in this stream, one record at a time, by applying a {@link Processor}
(provided by the given
      * {@link ProcessorSupplier}).
      * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress can be observed and additional
-     * periodic actions can be performed.
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress
+     * can be observed and additional periodic actions can be performed.
      * Note that this is a terminal operation that returns void.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand:
+     * In order to assign a state, the state must be created and registered beforehand (it's
not required to connect
+     * global state stores; read-only access to global state stores is available by default):
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -819,7 +827,8 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myProcessorState");
-     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..)); // punctuate each 1000ms, can access this.state
+     *                 // punctuate each 1000ms, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..));
      *             }
      *
      *             void process(K key, V value) {
@@ -857,8 +866,8 @@ public interface KStream<K, V> {
      * {@link #through(String)}) an internal repartitioning topic may need to be created
in Kafka if a later
      * operator depends on the newly selected key.
      * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where
"applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"&lt;name&gt;" is
-     * an internally generated name, and "-repartition" is a fixed suffix.
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed
suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -886,8 +895,8 @@ public interface KStream<K, V> {
      * {@link #through(String)}) an internal repartitioning topic may need to be created
in Kafka
      * if a later operator depends on the newly selected key.
      * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where
"applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"&lt;name&gt;" is
-     * an internally generated name, and "-repartition" is a fixed suffix.
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed
suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -916,9 +925,9 @@ public interface KStream<K, V> {
      * {@link #through(String)}) an internal repartitioning topic may need to be created
in Kafka if a later operator
      * depends on the newly selected key.
      * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where
"applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
&lt;name&gt; is
-     * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or
an internally generated name,
-     * and "-repartition" is a fixed suffix.
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * &lt;name&gt; is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)}
or an internally
+     * generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -938,14 +947,15 @@ public interface KStream<K, V> {
      * and default serializers and deserializers.
      * Grouping a stream on the record key is required before an aggregation operator can
be applied to the data
      * (cf. {@link KGroupedStream}).
-     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same
type) while preserving the original values.
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same
type) while preserving the
+     * original values.
      * If the new record key is {@code null} the record will not be included in the resulting
{@link KGroupedStream}
      * <p>
      * Because a new key is selected, an internal repartitioning topic may need to be created
in Kafka if a
      * later operator depends on the newly selected key.
      * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where
"applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"&lt;name&gt;" is
-     * an internally generated name, and "-repartition" is a fixed suffix.
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed
suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -966,14 +976,15 @@ public interface KStream<K, V> {
      * and {@link Serde}s as specified by {@link Serialized}.
      * Grouping a stream on the record key is required before an aggregation operator can
be applied to the data
      * (cf. {@link KGroupedStream}).
-     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same
type) while preserving the original values.
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same
type) while preserving the
+     * original values.
      * If the new record key is {@code null} the record will not be included in the resulting
{@link KGroupedStream}.
      * <p>
      * Because a new key is selected, an internal repartitioning topic may need to be created
in Kafka if a
      * later operator depends on the newly selected key.
      * This topic will be as "${applicationId}-&lt;name&gt;-repartition", where "applicationId"
is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"&lt;name&gt;" is
-     * an internally generated name, and "-repartition" is a fixed suffix.
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed
suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -997,14 +1008,16 @@ public interface KStream<K, V> {
      * and {@link Serde}s as specified by {@link Grouped}.
      * Grouping a stream on the record key is required before an aggregation operator can
be applied to the data
      * (cf. {@link KGroupedStream}).
-     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same
type) while preserving the original values.
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same
type) while preserving the
+     * original values.
      * If the new record key is {@code null} the record will not be included in the resulting
{@link KGroupedStream}.
      * <p>
      * Because a new key is selected, an internal repartitioning topic may need to be created
in Kafka if a later
      * operator depends on the newly selected key.
      * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where
"applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"&lt;name&gt;" is
-     * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or
an internally generated name.
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)}
or an
+     * internally generated name.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -1067,8 +1080,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link  StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1144,8 +1157,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link  StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1227,8 +1240,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1308,8 +1321,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1392,8 +1405,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1474,8 +1487,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
      * For this case, all data of the stream will be redistributed through the repartitioning
topic by writing all
@@ -1561,8 +1574,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -1636,8 +1649,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -1717,8 +1730,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -1795,8 +1808,8 @@ public interface KStream<K, V> {
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
      * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated name, and
-     * "-repartition" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;"
is an internally generated
+     * name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      * <p>
@@ -1889,5 +1902,4 @@ public interface KStream<K, V> {
     <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                          final KeyValueMapper<? super K, ? super V, ?
extends GK> keyValueMapper,
                                          final ValueJoiner<? super V, ? super GV, ? extends
RV> valueJoiner);
-
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 8c5a9b3..4a1df92 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.state.StoreBuilder
 import org.apache.kafka.streams.{Topology, StreamsBuilder => StreamsBuilderJ}
 import org.apache.kafka.streams.scala.kstream._
 import ImplicitConversions._
+import org.apache.kafka.streams.errors.TopologyException
 
 import scala.collection.JavaConverters._
 
@@ -77,7 +78,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
   /**
    * Create a [[kstream.KStream]] from the specified topic pattern.
    *
-   * @param topics the topic name pattern
+   * @param topicPattern the topic name pattern
    * @return a [[kstream.KStream]] for the specified topics
    * @see #stream(String)
    * @see `org.apache.kafka.streams.StreamsBuilder#stream`
@@ -157,6 +158,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
   /**
    * Adds a state store to the underlying `Topology`. The store must still be "connected"
to a `Processor`,
    * `Transformer`, or `ValueTransformer` before it can be used.
+   * <p>
+   * It is required to connect state stores to `Processor`, `Transformer`, or `ValueTransformer`
before they can be used.
    *
    * @param builder the builder used to obtain this state store `StateStore` instance
    * @return the underlying Java abstraction `StreamsBuilder` after adding the `StateStore`
@@ -166,8 +169,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
   def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder)
 
   /**
-   * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor,
`Transformer`,
+   * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor`,
`Transformer`,
    * or `ValueTransformer` (in contrast to regular stores).
+   * <p>
+   * It is not required to connect a global store to `Processor`, `Transformer`, or `ValueTransformer`;
+   * those have read-only access to all global stores by default.
    *
    * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
    */
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 635975b..5df9de8 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -285,9 +285,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * can be altered arbitrarily).
    * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input
record
    * and computes zero or more output records.
-   * In order to assign a state, the state must be created and registered
-   * beforehand via stores added via `addStateStore` or `addGlobalStore`
-   * before they can be connected to the `Transformer`
+   * In order to assign a state, the state must be created and added via `addStateStore`
before they can be connected
+   * to the `Transformer`.
+   * It's not required to connect global state stores that are added via `addGlobalStore`;
+   * read-only access to global state stores is available by default.
    *
    * @param transformerSupplier the `TransformerSuplier` that generates `Transformer`
    * @param stateStoreNames     the names of the state stores used by the processor
@@ -302,8 +303,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * Transform the value of each input record into a new value (with possible new type) of
the output record.
    * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to
each input
    * record value and computes a new value for it.
-   * In order to assign a state, the state must be created and registered
-   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can
be connected to the `Transformer`
+   * In order to assign a state, the state must be created and added via `addStateStore`
before they can be connected
+   * to the `ValueTransformer`.
+   * It's not required to connect global state stores that are added via `addGlobalStore`;
+   * read-only access to global state stores is available by default.
    *
    * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates
a `ValueTransformer`
    * @param stateStoreNames          the names of the state stores used by the processor
@@ -318,8 +321,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * Transform the value of each input record into a new value (with possible new type) of
the output record.
    * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to
each input
    * record value and computes a new value for it.
-   * In order to assign a state, the state must be created and registered
-   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can
be connected to the `Transformer`
+   * In order to assign a state, the state must be created and added via `addStateStore`
before they can be connected
+   * to the `ValueTransformer`.
+   * It's not required to connect global state stores that are added via `addGlobalStore`;
+   * read-only access to global state stores is available by default.
    *
    * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that
generates a `ValueTransformerWithKey`
    * @param stateStoreNames          the names of the state stores used by the processor
@@ -333,8 +338,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   /**
    * Process all records in this stream, one record at a time, by applying a `Processor`
(provided by the given
    * `processorSupplier`).
-   * In order to assign a state, the state must be created and registered
-   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can
be connected to the `Transformer`
+   * In order to assign a state, the state must be created and added via `addStateStore`
before they can be connected
+   * to the `Processor`.
+   * It's not required to connect global state stores that are added via `addGlobalStore`;
+   * read-only access to global state stores is available by default.
    *
    * @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]]
    * @param stateStoreNames   the names of the state store used by the processor


Mime
View raw message