kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9290: Update IQ related JavaDocs (#8114)
Date Fri, 08 May 2020 06:14:49 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 133c2ed  KAFKA-9290: Update IQ related JavaDocs (#8114)
133c2ed is described below

commit 133c2ed58ad3c70df75882728213a86013e17e95
Author: high.lee <yello1109@daum.net>
AuthorDate: Fri May 8 15:14:17 2020 +0900

    KAFKA-9290: Update IQ related JavaDocs (#8114)
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   | 24 +++++-----
 .../kafka/streams/kstream/CogroupedKStream.java    | 34 +++++++-------
 .../apache/kafka/streams/kstream/GlobalKTable.java |  2 +-
 .../kafka/streams/kstream/KGroupedStream.java      | 50 ++++++++++-----------
 .../kafka/streams/kstream/KGroupedTable.java       | 48 ++++++++++----------
 .../org/apache/kafka/streams/kstream/KTable.java   | 26 +++++------
 .../kstream/TimeWindowedCogroupedKStream.java      | 16 +++----
 .../kafka/streams/kstream/TimeWindowedKStream.java | 52 +++++++++++-----------
 8 files changed, 126 insertions(+), 126 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 997fe64..f3b08da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -208,13 +208,13 @@ public class StreamsBuilder {
      * streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
      * }
      * </pre>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -393,13 +393,13 @@ public class StreamsBuilder {
      * streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
      * }
      * </pre>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<V> valueForKey = localStore.get(key);
      * }</pre>
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
      * regardless of the specified value in {@link StreamsConfig} or {@link Consumed}.
@@ -435,13 +435,13 @@ public class StreamsBuilder {
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<V> valueForKey = localStore.get(key);
      * }</pre>
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
      * regardless of the specified value in {@link StreamsConfig}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index 53dd318..f9b6fb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -79,14 +79,14 @@ public interface CogroupedKStream<K, VOut> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
      * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
-     * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
      * the value of the key on a parallel running instance of your Kafka Streams application.
@@ -128,14 +128,14 @@ public interface CogroupedKStream<K, VOut> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
      * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
-     * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
      * the value of the key on a parallel running instance of your Kafka Streams application.
@@ -178,14 +178,14 @@ public interface CogroupedKStream<K, VOut> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>
      * KafkaStreams streams = ... // some aggregation on value type double
      * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
-     * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
      * the value of the key on a parallel running instance of your Kafka Streams application.
@@ -230,14 +230,14 @@ public interface CogroupedKStream<K, VOut> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(StoreQueryParameters)} KafkaStreams#store(...)}:
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>
      * KafkaStreams streams = ... // some aggregation on value type double
      * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
-     * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
      * the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index ba64abe..73efbc3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -49,7 +49,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * final KafkaStreams streams = ...;
  * streams.start()
  * ...
- * ReadOnlyKeyValueStore view = streams.store("g1-store", QueryableStoreTypes.keyValueStore());
+ * ReadOnlyKeyValueStore view = streams.store("g1-store", QueryableStoreTypes.timestampedKeyValueStore());
  * view.get(key); // can be done on any key, as all keys are present
  *}</pre>
  * Note that in contrast to {@link KTable} a {@code GlobalKTable}'s state holds a full copy of the underlying topic,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index bfcbacb..1fcfac9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -111,14 +111,14 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}.
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-word";
-     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedKeyValueStore());
+     * K key = "some-word";
+     * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -156,14 +156,14 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}.
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-word";
-     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedKeyValueStore());
+     * K key = "some-word";
+     * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -258,14 +258,14 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}.
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -321,14 +321,14 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}.
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -424,14 +424,14 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
      * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -482,14 +482,14 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(StoreQueryParameters)}  KafkaStreams#store(...)}:
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
      * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, ValueAndTimestamp<VR>>timestampedKeyValueStore());
+     * K key = "some-key";
+     * ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 628f458..23cd676 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -54,13 +54,13 @@ public interface KGroupedTable<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-word";
-     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>> timestampedKeyValueStore());
+     * K key = "some-word";
+     * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -97,13 +97,13 @@ public interface KGroupedTable<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-word";
-     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>> timestampedKeyValueStore());
+     * K key = "some-word";
+     * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -225,13 +225,13 @@ public interface KGroupedTable<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-word";
-     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>> timestampedKeyValueStore());
+     * K key = "some-word";
+     * ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -298,13 +298,13 @@ public interface KGroupedTable<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-word";
-     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>> timestampedKeyValueStore());
+     * K key = "some-word";
+     * ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -436,13 +436,13 @@ public interface KGroupedTable<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-word";
-     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>> timestampedKeyValueStore());
+     * K key = "some-word";
+     * ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -520,13 +520,13 @@ public interface KGroupedTable<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-word";
-     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>> timestampedKeyValueStore());
+     * K key = "some-word";
+     * ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1733c20..b7f073b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -53,7 +53,7 @@ import java.util.function.Function;
  *     streams.start()
  *     ...
  *     final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
- *     ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+ *     ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
  *     view.get(key);
  *}</pre>
  *<p>
@@ -130,13 +130,13 @@ public interface KTable<K, V> {
      * Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
      * is forwarded.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // filtering words
-     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
      * K key = "some-word";
-     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -169,13 +169,13 @@ public interface KTable<K, V> {
      * Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
      * is forwarded.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // filtering words
-     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
      * K key = "some-word";
-     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -255,13 +255,13 @@ public interface KTable<K, V> {
      * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
      * forwarded.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // filtering words
-     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
      * K key = "some-word";
-     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -293,13 +293,13 @@ public interface KTable<K, V> {
      * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
      * forwarded.
      * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
+     * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // filtering words
-     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
      * K key = "some-word";
-     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
index fd292c5..d99f3fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
@@ -155,17 +155,17 @@ public interface TimeWindowedCogroupedKStream<K, V> {
      * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link WindowStore} it must be obtained via
+     * To query the local {@link ReadOnlyWindowStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore());
      *
-     * String key = "some-word";
+     * K key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
-     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * WindowStoreIterator<ValueAndTimestamp<V>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -211,17 +211,17 @@ public interface TimeWindowedCogroupedKStream<K, V> {
      * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link WindowStore} it must be obtained via
+     * To query the local {@link ReadOnlyWindowStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore());
      *
-     * String key = "some-word";
+     * K key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
-     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * WindowStoreIterator<ValueAndTimestamp<V>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 1ee4f1f..71b7bca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -125,17 +125,17 @@ public interface TimeWindowedKStream<K, V> {
      * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link WindowStore} it must be obtained via
-     * {@link KafkaStreams#store(StoreQueryParameters)}  KafkaStreams#store(...)}:
+     * To query the local {@link ReadOnlyWindowStore} it must be obtained via
+     * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedWindowStore());
      *
-     * String key = "some-word";
+     * K key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
-     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * WindowStoreIterator<ValueAndTimestamp<Long>> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -173,17 +173,17 @@ public interface TimeWindowedKStream<K, V> {
      * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
      * <p>
-     * To query the local {@link WindowStore} it must be obtained via
+     * To query the local {@link ReadOnlyWindowStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedWindowStore());
      *
-     * String key = "some-word";
+     * K key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
-     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * WindowStoreIterator<ValueAndTimestamp<Long>> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -321,17 +321,17 @@ public interface TimeWindowedKStream<K, V> {
      * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link WindowStore} it must be obtained via
+     * To query the local {@link ReadOnlyWindowStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedWindowStore());
      *
-     * String key = "some-word";
+     * K key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
-     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -381,17 +381,17 @@ public interface TimeWindowedKStream<K, V> {
      * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
      * <p>
-     * To query the local {@link WindowStore} it must be obtained via
+     * To query the local {@link ReadOnlyWindowStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedWindowStore());
      *
-     * String key = "some-word";
+     * K key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
-     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -538,17 +538,17 @@ public interface TimeWindowedKStream<K, V> {
      * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link WindowStore} it must be obtained via
+     * To query the local {@link ReadOnlyWindowStore} it must be obtained via
      * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore());
      *
-     * String key = "some-word";
+     * K key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
-     * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * WindowStoreIterator<ValueAndTimestamp<V>> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -600,17 +600,17 @@ public interface TimeWindowedKStream<K, V> {
      * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link WindowStore} it must be obtained via
-     * {@link KafkaStreams#store(StoreQueryParameters)}  KafkaStreams#store(...)}:
+     * To query the local {@link ReadOnlyWindowStore} it must be obtained via
+     * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore());
      *
-     * String key = "some-word";
+     * K key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
-     * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * WindowStoreIterator<ValueAndTimestamp<V>> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.


Mime
View raw message