kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5931; deprecate KTable#through and KTable#to
Date Wed, 20 Sep 2017 11:04:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c58790595 -> 37ec15e96


KAFKA-5931; deprecate KTable#through and KTable#to

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3903 from dguy/deprectate-to-through


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37ec15e9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37ec15e9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37ec15e9

Branch: refs/heads/trunk
Commit: 37ec15e9627e2fe68d78eb6d95e9a117e3bca320
Parents: c587905
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Sep 20 12:04:13 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Sep 20 12:04:13 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KTable.java    | 84 +++++++++++++++-----
 1 file changed, 64 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/37ec15e9/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 6d1d85d..66ec0d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -38,7 +38,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * {@code KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed
table.
  * Each record in this changelog stream is an update on the primary-keyed table with the
record key as the primary key.
  * <p>
- * A {@code KTable} is either {@link StreamsBuilder#table(String, String) defined from a
single Kafka topic} that is
+ * A {@code KTable} is either {@link StreamsBuilder#table(String) defined from a single Kafka
topic} that is
  * consumed message by message or the result of a {@code KTable} transformation.
  * An aggregation of a {@link KStream} also yields a {@code KTable}.
  * <p>
@@ -66,7 +66,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * @see KStream
  * @see KGroupedTable
  * @see GlobalKTable
- * @see StreamsBuilder#table(String, String)
+ * @see StreamsBuilder#table(String)
  */
 @InterfaceStability.Evolving
 public interface KTable<K, V> {
@@ -763,17 +763,20 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName,
queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
-     * {@link StreamsBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, Materialized)})
      * The store name must be a valid Kafka topic name and cannot contain characters other
than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
      * @param queryableStoreName the state store name used for the result {@code KTable};
valid characters are ASCII
      *                  alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent
of {@link KTable#through(String)()}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final String topic,
                          final String queryableStoreName);
 
@@ -784,16 +787,19 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName,
queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
-     * {@link StreamsBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, Materialized)})
      * The store name must be a valid Kafka topic name and cannot contain characters other
than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final String topic,
                          final StateStoreSupplier<KeyValueStore> storeSupplier);
 
@@ -804,14 +810,17 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}.
+     * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an internal
store name (cf.
      * {@link StreamsBuilder#table(String)})
      *
      * @param topic     the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final String topic);
 
     /**
@@ -822,7 +831,7 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner,
someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}.
+     * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an internal
store name (cf.
      * {@link StreamsBuilder#table(String)})
@@ -831,7 +840,10 @@ public interface KTable<K, V> {
      *                    if not specified producer's {@link DefaultPartitioner} will be
used
      * @param topic       the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic);
 
@@ -843,10 +855,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner,
someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName,
queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
-     * {@link StreamsBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, Materialized)})
      *
      * @param partitioner the function used to determine how records are distributed among
partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be
used
@@ -854,7 +866,10 @@ public interface KTable<K, V> {
      * @param queryableStoreName   the state store name used for the result {@code KTable}.
      *                             If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner,
String)}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic,
                          final String queryableStoreName);
@@ -867,17 +882,20 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner,
someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName,
queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
-     * {@link StreamsBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, Materialized)})
      *
      * @param partitioner the function used to determine how records are distributed among
partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be
used
      * @param topic       the topic name
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic,
                          final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -891,10 +909,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde,
someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName,
queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
-     * {@link StreamsBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, Materialized)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration
will be used
@@ -904,7 +922,10 @@ public interface KTable<K, V> {
      * @param queryableStoreName the state store name used for the result {@code KTable}.
      *                           If {@code null} this is the equivalent of {@link KTable#through(Serde,
Serde, String)()}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
                          final String topic,
                          final String queryableStoreName);
@@ -918,10 +939,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde,
someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName,
queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
-     * {@link StreamsBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, Materialized)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration
will be used
@@ -930,7 +951,10 @@ public interface KTable<K, V> {
      * @param topic     the topic name
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
                          final String topic,
                          final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -955,7 +979,10 @@ public interface KTable<K, V> {
      *                  if not specified the default value serde defined in the configuration
will be used
      * @param topic     the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
                          final String topic);
 
@@ -967,10 +994,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName,
queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
-     * {@link StreamsBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, Materialized)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration
will be used
@@ -984,7 +1011,10 @@ public interface KTable<K, V> {
      * @param queryableStoreName  the state store name used for the result {@code KTable}.
      *                            If {@code null} this is the equivalent of {@link KTable#through(Serde,
Serde, StreamPartitioner, String)()}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
                          final Serde<V> valSerde,
                          final StreamPartitioner<? super K, ? super V> partitioner,
@@ -999,10 +1029,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName,
queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
-     * {@link StreamsBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, Materialized)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration
will be used
@@ -1015,7 +1045,10 @@ public interface KTable<K, V> {
      * @param topic      the topic name
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
                          final Serde<V> valSerde,
                          final StreamPartitioner<? super K, ? super V> partitioner,
@@ -1045,7 +1078,10 @@ public interface KTable<K, V> {
      *                    be used
      * @param topic      the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
      */
+    @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
                          final Serde<V> valSerde,
                          final StreamPartitioner<? super K, ? super V> partitioner,
@@ -1058,7 +1094,9 @@ public interface KTable<K, V> {
      * started).
      *
      * @param topic the topic name
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
      */
+    @Deprecated
     void to(final String topic);
 
     /**
@@ -1070,7 +1108,9 @@ public interface KTable<K, V> {
      * @param partitioner the function used to determine how records are distributed among
partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be
used
      * @param topic       the topic name
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
      */
+    @Deprecated
     void to(final StreamPartitioner<? super K, ? super V> partitioner,
             final String topic);
 
@@ -1087,7 +1127,9 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default value serde defined in the configuration
will be used
      * @param topic    the topic name
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
      */
+    @Deprecated
     void to(final Serde<K> keySerde,
             final Serde<V> valSerde,
             final String topic);
@@ -1107,7 +1149,9 @@ public interface KTable<K, V> {
      *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise
{@link DefaultPartitioner} will
      *                    be used
      * @param topic      the topic name
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
      */
+    @Deprecated
     void to(final Serde<K> keySerde,
             final Serde<V> valSerde,
             final StreamPartitioner<? super K, ? super V> partitioner,


Mime
View raw message