kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4144 Follow-up: add one missing overload function to maintain backward compatibility
Date Thu, 25 May 2017 02:01:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 9a21bf20b -> a8dbce47f


KAFKA-4144 Follow-up: add one missing overload function to maintain backward compatibility

A follow up RP to fix [issue](https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864)

Author: Jeyhun Karimov <je.karimov@gmail.com>

Reviewers: Matthias J. Sax, Eno Thereska, Bill Bejeck, Guozhang Wang

Closes #3109 from jeyhunkarimov/KAFKA-4144-follow-up

(cherry picked from commit c5d44af77473abb36cb9bf7ca3dead36490b8320)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a8dbce47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a8dbce47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a8dbce47

Branch: refs/heads/0.11.0
Commit: a8dbce47f5a39ebc0fb21b7e0ef43f837b252313
Parents: 9a21bf2
Author: Jeyhun Karimov <je.karimov@gmail.com>
Authored: Wed May 24 19:00:37 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 24 19:00:45 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   | 39 +++++++++++++++++++-
 1 file changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a8dbce47/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index fb05e4d..59b8c6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1088,7 +1088,44 @@ public class KStreamBuilder extends TopologyBuilder {
                                                  final StateStoreSupplier<KeyValueStore>
storeSupplier) {
         return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
     }
-
+    
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config}
is used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore}
with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can
be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy
{@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration
will be used
+     * @param valSerde           value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the
configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent
of
+     *                           {@link KStreamBuilder#globalTable(Serde, Serde, String)}
()}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                 final Serde<V> valSerde,
+                                                 final String topic,
+                                                 final String queryableStoreName) {
+        return globalTable(keySerde, valSerde, null, topic, queryableStoreName);
+    }
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.


Mime
View raw message