kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR KAFKA-7406: Follow up and address final comments (#5730)
Date Tue, 02 Oct 2018 22:45:10 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2a0abe5  MINOR KAFKA-7406: Follow up and address final comments (#5730)
2a0abe5 is described below

commit 2a0abe55f0b13ef0e9d136551e29a8fae137c3b7
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Tue Oct 2 18:44:33 2018 -0400

    MINOR KAFKA-7406: Follow up and address final comments (#5730)
    
    Reviewers: John Roesler <john@confluent.io>, Matthias J Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/kstream/Grouped.java  | 43 +++++++++++-----------
 .../org/apache/kafka/streams/kstream/KStream.java  | 14 +++----
 .../streams/kstream/internals/KStreamImpl.java     |  1 -
 .../streams/kstream/internals/KTableImpl.java      |  2 +-
 4 files changed, 30 insertions(+), 30 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
index 404cbd4..3380fc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.Serde;
  * The class that is used to capture the key and value {@link Serde}s and set the part of
name used for
  * repartition topics when performing {@link KStream#groupBy(KeyValueMapper, Grouped)}, {@link
  * KStream#groupByKey(Grouped)}, or {@link KTable#groupBy(KeyValueMapper, Grouped)} operations.
 Note
- * that Kafka Streams does not always create repartition topic for grouping operations.
+ * that Kafka Streams does not always create repartition topics for grouping operations.
  *
  * @param <K> the key type
  * @param <V> the value type
@@ -49,8 +49,7 @@ public class Grouped<K, V> {
     }
 
     /**
-     * Create a {@code Grouped} instance with the provided name used for a repartition topic
required for
-     * performing the grouping operation.
+     * Create a {@link Grouped} instance with the provided name used as part of the repartition
topic if required.
      *
      * @param name the name used for a repartition topic if required
      * @return a new {@link Grouped} configured with the name
@@ -64,9 +63,9 @@ public class Grouped<K, V> {
 
 
     /**
-     * Create a {@code Grouped} instance with the provided keySerde.
+     * Create a {@link Grouped} instance with the provided keySerde. If {@code null} the
default key serde from config will be used.
      *
-     * @param keySerde the Serde used for serializing the key
+     * @param keySerde the Serde used for serializing the key. If {@code null} the default
key serde from config will be used
      * @return a new {@link Grouped} configured with the keySerde
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
@@ -78,9 +77,9 @@ public class Grouped<K, V> {
 
 
     /**
-     * Create a {@code Grouped} instance with the provided valueSerde.
+     * Create a {@link Grouped} instance with the provided valueSerde.  If {@code null} the
default value serde from config will be used.
      *
-     * @param valueSerde the Serde used for serializing the value
+     * @param valueSerde the {@link Serde} used for serializing the value. If {@code null}
the default value serde from config will be used
      * @return a new {@link Grouped} configured with the valueSerde
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
@@ -91,11 +90,12 @@ public class Grouped<K, V> {
     }
 
     /**
-     * Create a {@code Grouped} instance with the provided {@code name}, {@code keySerde},
and {@code valueSerde}.
+     * Create a {@link Grouped} instance with the provided  name, keySerde, and valueSerde.
If the keySerde and/or the valueSerde is
+     * {@code null} the default value for the respective serde from config will be used.
      *
-     * @param name       the name used for part of the repartition topic name if required
-     * @param keySerde   the Serde used for serializing the key
-     * @param valueSerde the Serde used for serializing the value
+     * @param name       the name used as part of the repartition topic name if required
+     * @param keySerde   the {@link Serde} used for serializing the key. If {@code null}
the default key serde from config will be used
+     * @param valueSerde the {@link Serde} used for serializing the value. If {@code null}
the default value serde from config will be used
      * @return a new {@link Grouped} configured with the name, keySerde, and valueSerde
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
@@ -109,10 +109,11 @@ public class Grouped<K, V> {
 
 
     /**
-     * Create a {@code Grouped} instance with the provided {@code keySerde} and {@code valueSerde}.
+     * Create a {@link Grouped} instance with the provided keySerde and valueSerde.  If the
keySerde and/or the valueSerde is
+     * {@code null} the default value for the respective serde from config will be used.
      *
-     * @param keySerde   the Serde used for serializing the key
-     * @param valueSerde the Serde used for serializing the value
+     * @param keySerde   the {@link Serde} used for serializing the key. If {@code null}
the default key serde from config will be used
+     * @param valueSerde the {@link Serde} used for serializing the value. If {@code null}
the default value serde from config will be used
      * @return a new {@link Grouped} configured with the keySerde, and valueSerde
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
@@ -125,10 +126,10 @@ public class Grouped<K, V> {
 
     /**
      * Perform the grouping operation with the name for a repartition topic if required.
 Note
-     * that Kafka Streams does not always create a repartition topic for grouping operations.
+     * that Kafka Streams does not always create repartition topics for grouping operations.
      *
-     * @param name the name used for part of the repartition topic if required
-     * @return a new @{Grouped} instance configured with the {@code name}
+     * @param name the name used as part of the repartition topic name if required
+     * @return a new {@link Grouped} instance configured with the name
      * */
     public Grouped<K, V> withName(final String name) {
         return new Grouped<>(name, keySerde, valueSerde);
@@ -137,8 +138,8 @@ public class Grouped<K, V> {
     /**
      * Perform the grouping operation using the provided keySerde for serializing the key.
      *
-     * @param keySerde Serde to use for serializing the key
-     * @return a new {@code Grouped} instance configured with the {@code keySerde}
+     * @param keySerde {@link Serde} to use for serializing the key. If {@code null} the
default key serde from config will be used
+     * @return a new {@link Grouped} instance configured with the keySerde
      */
     public Grouped<K, V> withKeySerde(final Serde<K> keySerde) {
         return new Grouped<>(name, keySerde, valueSerde);
@@ -147,8 +148,8 @@ public class Grouped<K, V> {
     /**
      * Perform the grouping operation using the provided valueSerde for serializing the value.
      *
-     * @param valueSerde Serde to use for serializing the value
-     * @return a new {@code Grouped} instance configured with the {@code valueSerde}
+     * @param valueSerde {@link Serde} to use for serializing the value. If {@code null}
the default value serde from config will be used
+     * @return a new {@link Grouped} instance configured with the valueSerde
      */
     public Grouped<K, V> withValueSerde(final Serde<V> valueSerde) {
         return new Grouped<>(name, keySerde, valueSerde);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 13f1dfc..77987a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -801,7 +801,7 @@ public interface KStream<K, V> {
      * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
      * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened
afterwards (e.g., via
      * {@link #through(String)}) an internal repartitioning topic may need to be created
in Kafka
-     * if a later operator depends on the newly selected key..
+     * if a later operator depends on the newly selected key.
      * This topic will be named "${applicationId}-&lt;name&gt-repartition", where
"applicationId" is user-specified in
      * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
@@ -892,8 +892,8 @@ public interface KStream<K, V> {
      * <p>
      * Because a new key is selected, an internal repartitioning topic may need to be created
in Kafka if a
      * later operator depends on the newly selected key.
-     * This topic will be as "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"XXX" is
+     * This topic will be as "${applicationId}-&lt;name&gt-repartition", where "applicationId"
is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -990,9 +990,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use
the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data,
i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId"
is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt-repartition",
where "applicationId" is
      * user-specified in {@link  StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally
generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt"
is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * <p>
@@ -1068,9 +1068,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use
the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data,
i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic
before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId"
is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt-repartition",
where "applicationId" is
      * user-specified in {@link  StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally
generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt"
is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 168e210..a3884c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -567,7 +567,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
 
         if (joinThis.repartitionRequired) {
             final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name()
+ "-left" : joinThis.name;
-
             joinThis = joinThis.repartitionForJoin(Joined.with(joined.keySerde(), joined.valueSerde(),
joined.otherValueSerde(), leftJoinRepartitionTopicName));
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3ce962b..c5cd11d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -552,7 +552,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V>
implements KTable<
 
     @Override
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super
K, ? super V, KeyValue<K1, V1>> selector) {
-        return this.groupBy(selector, Grouped.with(null, null));
+        return groupBy(selector, Grouped.with(null, null));
     }
 
     @Override


Mime
View raw message