kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [2/2] kafka git commit: MINOR: fix JavaDocs warnings
Date Tue, 03 Oct 2017 14:36:36 GMT
MINOR: fix JavaDocs warnings

 - add some missing annotations for deprecated methods

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Michael G. Noll <michael@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #4005 from mjsax/minor-fix-javadoc-warnings


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3dcbbf70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3dcbbf70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3dcbbf70

Branch: refs/heads/trunk
Commit: 3dcbbf703017985c9e212ad69cc7afdddbf358eb
Parents: 716330a
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Oct 3 07:35:42 2017 -0700
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Tue Oct 3 07:35:42 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |   9 +-
 .../kafka/streams/TopologyDescription.java      |   4 +-
 .../kafka/streams/kstream/Aggregator.java       |  12 +-
 .../kafka/streams/kstream/GlobalKTable.java     |   2 +-
 .../kafka/streams/kstream/Initializer.java      |  12 +-
 .../kafka/streams/kstream/JoinWindows.java      |   2 +-
 .../kafka/streams/kstream/KGroupedStream.java   | 124 ++++++------
 .../kafka/streams/kstream/KGroupedTable.java    |  27 +--
 .../apache/kafka/streams/kstream/KStream.java   |  88 ++++-----
 .../kafka/streams/kstream/KStreamBuilder.java   |  15 +-
 .../apache/kafka/streams/kstream/KTable.java    | 189 +++++++++++--------
 .../apache/kafka/streams/kstream/Printed.java   |   1 -
 .../apache/kafka/streams/kstream/Reducer.java   |  12 +-
 .../streams/kstream/SessionWindowedKStream.java |  14 +-
 .../kafka/streams/kstream/SessionWindows.java   |   7 +-
 .../kafka/streams/kstream/TimeWindows.java      |   7 +-
 .../kafka/streams/kstream/UnlimitedWindows.java |   7 +-
 .../apache/kafka/streams/kstream/Windowed.java  |  14 +-
 .../streams/state/QueryableStoreTypes.java      |   6 +-
 .../org/apache/kafka/streams/state/Stores.java  |   2 +-
 20 files changed, 288 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 928d0e9..fd9c729 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
@@ -948,10 +949,10 @@ public class KafkaStreams {
      * <p>
      * This will use the default Kafka Streams partitioner to locate the partition.
      * If a {@link StreamPartitioner custom partitioner} has been
-     * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig},
-     * {@link KStream#through(StreamPartitioner, String)}, or {@link KTable#through(StreamPartitioner, String, String)},
-     * or if the original {@link KTable}'s input {@link StreamsBuilder#table(String, String) topic} is partitioned
-     * differently, please use {@link #metadataForKey(String, Object, StreamPartitioner)}.
+     * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
+     * {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input
+     * {@link StreamsBuilder#table(String) topic} is partitioned differently, please use
+     * {@link #metadataForKey(String, Object, StreamPartitioner)}.
      * <p>
      * Note:
      * <ul>

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 1b520c6..01af8bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -55,9 +55,9 @@ public interface TopologyDescription {
     }
 
     /**
-     * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.processor.StateStoreSupplier, String,
+     * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder, String,
      * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
-     * String, org.apache.kafka.streams.processor.ProcessorSupplier)} global store}.
+     * String, org.apache.kafka.streams.processor.ProcessorSupplier) global store}.
      * Adding a global store results in adding a source node and one stateful processor node.
      * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
      * global stores are not connected to each other.

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 4eec4f5..217a145 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -29,12 +29,12 @@ package org.apache.kafka.streams.kstream;
  * @param <V> input value type
  * @param <VA> aggregate value type
  * @see Initializer
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
  * @see Reducer
  */
 public interface Aggregator<K, V, VA> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 81aa405..72286c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -61,7 +61,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
  * @see KTable
- * @see StreamsBuilder#globalTable(String, String)
+ * @see StreamsBuilder#globalTable(String)
  * @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)
  * @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index d41c638..1b59c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -23,12 +23,12 @@ package org.apache.kafka.streams.kstream;
  *
  * @param <VA> aggregate value type
  * @see Aggregator
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
  */
 public interface Initializer<VA> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index ef9ed01..863ae95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -59,7 +59,7 @@ import java.util.Map;
  * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
  * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
- * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
  * @see TimestampExtractor
  */
 public final class JoinWindows extends Windows<Window> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 1c72ebf..17f2db4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -85,7 +85,7 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
-     * @deprecated use {@link #count(Materialized)
+     * @deprecated use {@link #count(Materialized) count(Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, Long> count(final String queryableStoreName);
@@ -146,7 +146,7 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
-     * @deprecated use {@link #count(Materialized)}
+     * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -229,7 +229,8 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count(Windows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window.
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#count(Materialized) count(Materialized.as(queryableStoreName))}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
@@ -264,7 +265,7 @@ public interface KGroupedStream<K, V> {
      * @param windows   the specification of the aggregation {@link Windows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by {@link TimeWindowedKStream#count() count()}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows);
@@ -306,7 +307,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
@@ -344,10 +346,11 @@ public interface KGroupedStream<K, V> {
      *
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
      * @param queryableStoreName  the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)} ()}.
+     * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#count(Materialized) count(Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName);
@@ -371,7 +374,8 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#count() count()}
      */
     @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows);
@@ -409,7 +413,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier  user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
@@ -419,7 +424,7 @@ public interface KGroupedStream<K, V> {
      * Combine the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
      * Combining implies that the type of the aggregate result is the same as the type of the input value
-     * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
+     * (c.f. {@link #aggregate(Initializer, Aggregator)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -464,10 +469,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -506,7 +511,7 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprectated use {@link #reduce(Reducer, Materialized)}
+     * @deprecated  use {@link #reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> reducer,
@@ -527,10 +532,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -562,7 +567,7 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprectated use {@link #reduce(Reducer, Materialized)}
+     * @deprecated use {@link #reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> reducer,
@@ -582,10 +587,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -638,10 +643,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -680,10 +685,11 @@ public interface KGroupedStream<K, V> {
      * @param reducer   a {@link Reducer} that computes a new aggregate result
      * @param windows   the specification of the aggregation {@link Windows}
      * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -728,7 +734,8 @@ public interface KGroupedStream<K, V> {
      * @param windows   the specification of the aggregation {@link Windows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#reduce(Reducer) reduce(reducer)}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -752,10 +759,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -790,7 +797,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -813,10 +821,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -855,10 +863,11 @@ public interface KGroupedStream<K, V> {
      * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @param queryableStoreName     the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -893,7 +902,8 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#reduce(Reducer) reduce(reducer)}
      */
     @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -915,10 +925,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -959,7 +969,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier     user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -1016,11 +1027,11 @@ public interface KGroupedStream<K, V> {
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
      * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Serde)} ()} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Serde)}.
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -1031,7 +1042,7 @@ public interface KGroupedStream<K, V> {
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
      * allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -1043,7 +1054,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count(String)}).
+     * count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -1087,7 +1098,7 @@ public interface KGroupedStream<K, V> {
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
      * allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -1099,7 +1110,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count(String)}).
+     * count (c.f. {@link #count()}).
      * <p>
      * The default value serde from config will be used for serializing the result.
      * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
@@ -1127,10 +1138,11 @@ public interface KGroupedStream<K, V> {
      */
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator);
+
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
      * allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -1142,7 +1154,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count(String)}).
+     * count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -1166,14 +1178,13 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator,
                                  final Serde<VR> aggValueSerde);
 
-
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
@@ -1189,7 +1200,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
-     * like count (c.f. {@link #count(String)}).
+     * like count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -1217,7 +1228,7 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -1243,7 +1254,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)}).
+     * functions like count (c.f. {@link #count(Windows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1282,10 +1293,11 @@ public interface KGroupedStream<K, V> {
      *                      if not specified the default serdes defined in the configs will be used
      * @param <VR>          the value type of the resulting {@link KTable}
      * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)} ()} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1313,7 +1325,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)}).
+     * functions like count (c.f. {@link #count(Windows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1338,7 +1350,8 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1346,7 +1359,6 @@ public interface KGroupedStream<K, V> {
                                                              final Windows<W> windows,
                                                              final Serde<VR> aggValueSerde);
 
-
     /**
      * Aggregate the values of records in this stream by the grouped key and defined windows.
      * Records with {@code null} key or value are ignored.
@@ -1366,7 +1378,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)}).
+     * functions like count (c.f. {@link #count(Windows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1397,7 +1409,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1422,7 +1435,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute
-     * aggregate functions like count (c.f. {@link #count(String)})
+     * aggregate functions like count (c.f. {@link #count(SessionWindows)})
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1452,10 +1465,11 @@ public interface KGroupedStream<K, V> {
      *                      if not specified the default serdes defined in the configs will be used
      * @param <T>           the value type of the resulting {@link KTable}
      * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)} ()} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
@@ -1482,7 +1496,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute
-     * aggregate functions like count (c.f. {@link #count(String)})
+     * aggregate functions like count (c.f. {@link #count(SessionWindows)})
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1500,7 +1514,8 @@ public interface KGroupedStream<K, V> {
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
@@ -1526,7 +1541,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used
-     * to compute aggregate functions like count (c.f. {@link #count(String)}).
+     * to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1559,7 +1574,8 @@ public interface KGroupedStream<K, V> {
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as(KeyValueByteStoreSupplier).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index f854320..f814eaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -81,7 +81,7 @@ public interface KGroupedTable<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
-     * @deprecated use {@link #count(Materialized)}
+     * @deprecated use {@link #count(Materialized) count(Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, Long> count(final String queryableStoreName);
@@ -192,7 +192,7 @@ public interface KGroupedTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
-     * @deprecated use {@link #count(Materialized)}
+     * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
     KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -264,7 +264,7 @@ public interface KGroupedTable<K, V> {
      * '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
+     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized) reduce(adder, subtractor, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> adder,
@@ -346,7 +346,7 @@ public interface KGroupedTable<K, V> {
      * mapped} to the same key into a new instance of {@link KTable}.
      * Records with {@code null} key are ignored.
      * Combining implies that the type of the aggregate result is the same as the type of the input value
-     * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -462,7 +462,7 @@ public interface KGroupedTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
+     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized) reduce(adder, subtractor, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> adder,
@@ -547,7 +547,7 @@ public interface KGroupedTable<K, V> {
      * @param <VR>        the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -559,7 +559,7 @@ public interface KGroupedTable<K, V> {
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
      * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
      * Records with {@code null} key are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, Materialized) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * provided by the given {@code storeSupplier}.
@@ -639,11 +639,10 @@ public interface KGroupedTable<K, V> {
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
      * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
      * Records with {@code null} key are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
      * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
-     * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String)
-     * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}.
+     * serde} you should use {@link #aggregate(Initializer, Aggregator, Aggregator, Serde)}.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * provided by the given {@code storeSupplier}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -784,7 +783,7 @@ public interface KGroupedTable<K, V> {
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -797,7 +796,7 @@ public interface KGroupedTable<K, V> {
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
      * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
      * Records with {@code null} key are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, Materialized) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -857,7 +856,9 @@ public interface KGroupedTable<K, V> {
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,
@@ -938,7 +939,9 @@ public interface KGroupedTable<K, V> {
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(KeyValueByteStoreSupplier))}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index c56a4ed..0d1d201 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -290,7 +291,7 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param label the name used to label the key/value pairs printed to the console
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut()}
      */
     @Deprecated
     void print(final String label);
@@ -308,7 +309,7 @@ public interface KStream<K, V> {
      *
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde value serde used to deserialize value if type is {@code byte[]},
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(...)}
      */
     @Deprecated
     void print(final Serde<K> keySerde,
@@ -326,7 +327,7 @@ public interface KStream<K, V> {
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
      * @param label the name used to label the key/value pairs printed to the console
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)}
      */
     @Deprecated
     void print(final Serde<K> keySerde,
@@ -342,7 +343,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -352,7 +352,7 @@ public interface KStream<K, V> {
      * The KeyValueMapper's mapped value type must be {@code String}.
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(mapper)}
      */
     @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -366,7 +366,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -377,7 +376,7 @@ public interface KStream<K, V> {
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
      * @param label The given name which labels output will be printed.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String label);
@@ -391,7 +390,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -404,9 +402,9 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}.
-     * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}.
-     * @deprecated use {@code print(Printed)}
+     * @param keySerde a {@link Serde} used to deserialize key if type is {@code byte[]}.
+     * @param valSerde a {@link Serde} used to deserialize value if type is {@code byte[]}.
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(mapper)}
      */
     @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde);
@@ -420,7 +418,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -433,10 +430,10 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}.
-     * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}.
+     * @param keySerde a {@link Serde} used to deserialize key if type is {@code byte[]}.
+     * @param valSerde a {@link Serde} used to deserialize value if type is {@code byte[]}.
      * @param label The given name which labels output will be printed.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label);
@@ -455,7 +452,7 @@ public interface KStream<K, V> {
      * Relative order is preserved within each input stream though (ie, records within one input
      * stream are processed in order).
      *
-     * @param a stream which is to be merged into this stream
+     * @param stream a stream which is to be merged into this stream
      * @return a merged stream containing all records from this and the provided {@code KStream}
      */
     KStream<K, V> merge(final KStream<K, V> stream);
@@ -472,7 +469,7 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param filePath name of the file to write to
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath)}
      */
     @Deprecated
     void writeAsText(final String filePath);
@@ -489,7 +486,7 @@ public interface KStream<K, V> {
      *
      * @param filePath   name of the file to write to
      * @param label the name used to label the key/value pairs written to the file
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label)}
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -509,7 +506,7 @@ public interface KStream<K, V> {
      * @param filePath name of the file to write to
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde value serde used to deserialize value if type is {@code byte[]},
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(...)}
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -530,7 +527,7 @@ public interface KStream<K, V> {
      * @param label the name used to label the key/value pairs written to the file
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used deserialize value if type is {@code byte[]},
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)}
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -549,7 +546,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -560,7 +556,7 @@ public interface KStream<K, V> {
      *
      * @param filePath path of the file to write to.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -576,7 +572,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -588,7 +583,7 @@ public interface KStream<K, V> {
      * @param filePath path of the file to write to.
      * @param label the name used to label records written to file.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void writeAsText(final String filePath, final String label, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -604,7 +599,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -620,7 +614,7 @@ public interface KStream<K, V> {
      * @param keySerde key serde used to deserialize key if type is {@code byte[]}.
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -636,7 +630,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -653,8 +646,9 @@ public interface KStream<K, V> {
      * @param keySerde key serde used to deserialize key if type is {@code byte[]}.
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(mapper)}
      */
+    @Deprecated
     void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
 
     /**
@@ -704,8 +698,7 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#stream(String)
-     * StreamsBuilder#stream(someTopicName)}.
+     * {@link StreamsBuilder#stream(String) StreamsBuilder#stream(someTopicName)}.
      *
      * @param topic the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
@@ -725,7 +718,7 @@ public interface KStream<K, V> {
      *                    if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic       the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @deprecated use {@code through(String, Produced)}
+     * @deprecated use {@link #through(String, Produced) through(topic, Produced.withStreamPartitioner(partitioner))}
      */
     @Deprecated
     KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -740,7 +733,7 @@ public interface KStream<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valSerde, someTopicName)} and
-     * {@link StreamsBuilder#stream(Serde, Serde, String...) StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+     * {@link KStreamBuilder#stream(Serde, Serde, String...) KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
      *
      * @param keySerde key serde used to send key-value pairs,
      *                 if not specified the default key serde defined in the configuration will be used
@@ -748,7 +741,7 @@ public interface KStream<K, V> {
      *                 if not specified the default value serde defined in the configuration will be used
      * @param topic    the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @deprecated use {@code through(String, Produced)}
+     * @deprecated use {@link #through(String, Produced) through(topic, Produced.with(keySerde, valSerde))}
      */
     @Deprecated
     KStream<K, V> through(final Serde<K> keySerde,
@@ -762,8 +755,8 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) #to(keySerde, valSerde,
-     * StreamPartitioner, someTopicName)} and {@link StreamsBuilder#stream(Serde, Serde, String...)
-     * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+     * StreamPartitioner, someTopicName)} and {@link KStreamBuilder#stream(Serde, Serde, String...)
+     * KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used
@@ -775,7 +768,7 @@ public interface KStream<K, V> {
      *                    be used
      * @param topic       the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @deprecated use {@code through(String, Produced)}
+     * @deprecated use {@link #through(String, Produced) through(topic, Produced.with(keySerde, valSerde, partitioner))}
      */
     @Deprecated
     KStream<K, V> through(final Serde<K> keySerde,
@@ -791,8 +784,7 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)}
-     * and {@link StreamsBuilder#stream(Serde, Serde, String...)
-     * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+     * and {@link StreamsBuilder#stream(String, Consumed) StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde))}.
      *
      * @param topic
      * @param produced
@@ -820,7 +812,7 @@ public interface KStream<K, V> {
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic       the topic name
-     * @deprecated use {@code to(String, Produced}
+     * @deprecated use {@link #to(String, Produced) to(topic, Produced.withStreamPartitioner(partitioner))}
      */
     @Deprecated
     void to(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -838,7 +830,7 @@ public interface KStream<K, V> {
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be used
      * @param topic    the topic name
-     * @deprecated use {@code to(String, Produced}
+     * @deprecated use {@link #to(String, Produced) to(topic, Produced.with(keySerde, valSerde))}
      */
     @Deprecated
     void to(final Serde<K> keySerde,
@@ -860,7 +852,7 @@ public interface KStream<K, V> {
      *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
      *                    be used
      * @param topic       the topic name
-     * @deprecated use {@code to(String, Produced}
+     * @deprecated use {@link #to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner)}
      */
     @Deprecated
     void to(final Serde<K> keySerde,
@@ -1159,7 +1151,7 @@ public interface KStream<K, V> {
      * @param valSerde value serdes for materializing this stream,
      *                 if not specified the default serdes defined in the configs will be used
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
-     * @deprecated use {@code groupByKey(Serialized)}
+     * @deprecated use {@link #groupByKey(Serialized) groupByKey(Serialized.with(keySerde, valSerde))}
      */
     @Deprecated
     KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
@@ -1244,7 +1236,7 @@ public interface KStream<K, V> {
      * @param <KR>     the key type of the result {@link KGroupedStream}
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
      * @see #groupByKey()
-     * @deprecated use {@code groupBy(KeyValueMapper, Serialized}
+     * @deprecated use {@link #groupBy(KeyValueMapper, Serialized) groupBy(selector, Serialized.with(keySerde, valSerde))}
      */
     @Deprecated
     <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
@@ -1480,7 +1472,7 @@ public interface KStream<K, V> {
      * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals
      * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
      * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
-     * @deprecated use {@link #join(KStream, ValueJoiner, JoinWindows, Joined)}
+     * @deprecated use {@link #join(KStream, ValueJoiner, JoinWindows, Joined) join(otherStream, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde))}
      */
     @Deprecated
     <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
@@ -1734,7 +1726,7 @@ public interface KStream<K, V> {
      * this {@code KStream} and within the joining window intervals
      * @see #join(KStream, ValueJoiner, JoinWindows, Joined)
      * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
-     * @deprecated use {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined}
+     * @deprecated use {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) leftJoin(otherStream, joiner, windows, Joined.with(keySerde, thisValSerde, otherValueSerde))}
      */
     @Deprecated
     <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
@@ -1988,7 +1980,7 @@ public interface KStream<K, V> {
      * both {@code KStream}s and within the joining window intervals
      * @see #join(KStream, ValueJoiner, JoinWindows, Joined)
      * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
-     * @deprecated use {@link #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)}
+     * @deprecated use {@link #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) outerJoin(otherStream, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde))}
      */
     @Deprecated
     <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
@@ -2222,7 +2214,7 @@ public interface KStream<K, V> {
      * {@link ValueJoiner}, one for each matched record-pair with the same key
      * @see #leftJoin(KTable, ValueJoiner, Joined)
      * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
-     * @deprecated use {@link #join(KTable, ValueJoiner, Joined)}
+     * @deprecated use {@link #join(KTable, ValueJoiner, Joined) join(table, joiner, Joined.with(keySerde, valSerde, null))}
      */
     @Deprecated
     <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
@@ -2461,7 +2453,9 @@ public interface KStream<K, V> {
      * {@link ValueJoiner}, one output for each input {@code KStream} record
      * @see #join(KTable, ValueJoiner, Serde, Serde)
      * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
+     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Joined) leftJoin(table, joiner, Joined.with(keySerde, valSerde, null))}
      */
+    @Deprecated
     <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                      final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                      final Serde<K> keySerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ab666ba..d4642da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -479,10 +479,11 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param offsetReset       the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                          offsets are available
-     * @param topic             the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                           offsets are available
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, String) table(AutoOffsetReset, String)}.
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -791,7 +792,8 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @param valSerde           value serde used to send key-value pairs,
      *                           if not specified the default value serde defined in the configuration will be used
      * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -876,7 +878,8 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @param valSerde           value serde used to send key-value pairs,
      *                           if not specified the default value serde defined in the configuration will be used
      * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,


Mime
View raw message