kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [2/2] kafka git commit: KAFKA-5873; add materialized overloads to StreamsBuilder
Date Mon, 18 Sep 2017 14:53:50 GMT
KAFKA-5873; add materialized overloads to StreamsBuilder

Add overloads for `table` and `globalTable` that use `Materialized`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3837 from dguy/kafka-5873


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f2b74aa1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f2b74aa1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f2b74aa1

Branch: refs/heads/trunk
Commit: f2b74aa1c36bf2882006c14f7cbd56b493f39d26
Parents: 52d7b67
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Sep 18 15:53:44 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Mon Sep 18 15:53:44 2017 +0100

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    |   7 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   9 +-
 .../apache/kafka/streams/StreamsBuilder.java    | 700 +++----------------
 .../java/org/apache/kafka/streams/Topology.java |   2 +-
 .../internals/InternalStreamsBuilder.java       | 152 ++--
 .../streams/kstream/internals/KTableImpl.java   |   9 +-
 .../kstream/internals/MaterializedInternal.java |   7 +-
 .../internals/InternalTopologyBuilder.java      |   3 +-
 .../apache/kafka/streams/KafkaStreamsTest.java  |   3 +-
 .../streams/integration/EosIntegrationTest.java |  12 +-
 .../GlobalKTableIntegrationTest.java            |  13 +-
 .../integration/JoinIntegrationTest.java        |   4 +-
 .../KStreamKTableJoinIntegrationTest.java       |   3 +-
 .../KTableKTableJoinIntegrationTest.java        |   6 +-
 .../integration/RestoreIntegrationTest.java     |   3 +-
 .../internals/GlobalKTableJoinsTest.java        |   5 +-
 .../internals/InternalStreamsBuilderTest.java   |  87 ++-
 .../internals/KGroupedTableImplTest.java        |  25 +-
 .../kstream/internals/KStreamImplTest.java      |  11 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |   2 +-
 .../internals/KStreamKTableJoinTest.java        |   7 +-
 .../internals/KStreamKTableLeftJoinTest.java    |   5 +-
 .../kstream/internals/KTableAggregateTest.java  |  22 +-
 .../kstream/internals/KTableFilterTest.java     |  28 +-
 .../kstream/internals/KTableForeachTest.java    |  12 +-
 .../kstream/internals/KTableImplTest.java       |  29 +-
 .../kstream/internals/KTableKTableJoinTest.java |  22 +-
 .../internals/KTableKTableLeftJoinTest.java     |  29 +-
 .../internals/KTableKTableOuterJoinTest.java    |  14 +-
 .../kstream/internals/KTableMapKeysTest.java    |   3 +-
 .../kstream/internals/KTableMapValuesTest.java  |  16 +-
 .../kstream/internals/KTableSourceTest.java     |  10 +-
 .../kafka/streams/perf/SimpleBenchmark.java     |  13 +-
 .../kafka/streams/perf/YahooBenchmark.java      |   3 +-
 .../internals/StreamsMetadataStateTest.java     |   8 +-
 .../kafka/streams/tests/SmokeTestClient.java    |  13 +-
 36 files changed, 443 insertions(+), 854 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 72f9be8..068eece 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -145,8 +146,8 @@ public class PageViewTypedDemo {
 
         KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), pageViewSerde));
 
-        KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde,
-            "streams-userprofile-input", "streams-userprofile-store-name");
+        KTable<String, UserProfile> users = builder.table("streams-userprofile-input",
+                                                          Consumed.with(Serdes.String(), userProfileSerde));
 
         KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
                 .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
@@ -190,7 +191,7 @@ public class PageViewTypedDemo {
                 });
 
         // write to the result topic
-        regionCount.to(wPageViewByRegionSerde, regionCountSerde, "streams-pageviewstats-typed-output");
+        regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));
 
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index e8787af..c20c077 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -73,10 +74,10 @@ public class PageViewUntypedDemo {
         final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
         final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
 
-        KStream<String, JsonNode> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), jsonSerde));
+        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
+        KStream<String, JsonNode> views = builder.stream("streams-pageview-input", consumed);
 
-        KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde,
-            "streams-userprofile-input", "streams-userprofile-store-name");
+        KTable<String, JsonNode> users = builder.table("streams-userprofile-input", consumed);
 
         KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
             @Override
@@ -121,7 +122,7 @@ public class PageViewUntypedDemo {
                 });
 
         // write to the result topic
-        regionCount.to(jsonSerde, jsonSerde, "streams-pageviewstats-untyped-output");
+        regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde));
 
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a26822a..a272ec4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -16,28 +16,30 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 /**
@@ -129,6 +131,8 @@ public class StreamsBuilder {
      */
     public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics,
                                                     final Consumed<K, V> consumed) {
+        Objects.requireNonNull(topics, "topics can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
         return internalStreamsBuilder.stream(topics, new ConsumedInternal<>(consumed));
     }
 
@@ -170,6 +174,8 @@ public class StreamsBuilder {
      */
     public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern,
                                                     final Consumed<K, V> consumed) {
+        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
         return internalStreamsBuilder.stream(topicPattern, new ConsumedInternal<>(consumed));
     }
 
@@ -182,11 +188,17 @@ public class StreamsBuilder {
      * Note that the specified input topic must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} using the given
+     * {@code Materialized} instance.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
+     * You should only specify serdes in the {@link Consumed} instance as these will also be used to overwrite the
+     * serdes in {@link Materialized}, i.e.,
+     * <pre> {@code
+     * streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
+     * }
+     * </pre>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
@@ -199,46 +211,20 @@ public class StreamsBuilder {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
      * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} this is the equivalent of {@link #table(String)}
+     * @param consumed           the instance of {@link Consumed} used to define optional parameters; cannot be {@code null}
+     * @param materialized       the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic,
-                                                  final String queryableStoreName) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final String topic,
-                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.table(null, null, null, null, topic, storeSupplier);
+                                                  final Consumed<K, V> consumed,
+                                                  final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+        return internalStreamsBuilder.table(topic,
+                                            new ConsumedInternal<>(consumed),
+                                            new MaterializedInternal<>(materialized));
     }
 
     /**
@@ -259,7 +245,7 @@ public class StreamsBuilder {
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), null);
+        return table(topic, new ConsumedInternal<K, V>());
     }
 
     /**
@@ -277,450 +263,47 @@ public class StreamsBuilder {
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      *
      * @param topic     the topic name; cannot be {@code null}
-     * @param consumed  the instance of {@link Consumed} used to define optional parameters
+     * @param consumed  the instance of {@link Consumed} used to define optional parameters; cannot be {@code null}
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic,
                                                   final Consumed<K, V> consumed) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), null);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                           offsets are available
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} this is the equivalent of
-     *                           {@link #table(String, Consumed)}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(offsetReset)), queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@link TimestampExtractor} and default key and value deserializers
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset   the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                      offsets are available
-     * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final String topic,
-                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.table(offsetReset, null, null, null, topic, storeSupplier);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(timestampExtractor)), queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                           offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final TimestampExtractor timestampExtractor,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        final Consumed<K, V> consumed = Consumed.<K, V>with(offsetReset).withTimestampExtractor(timestampExtractor);
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(keySerde, valueSerde, null, null),
-                                            queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param keySerde      key serde used to send key-value pairs,
-     *                      if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde    value serde used to send key-value pairs,
-     *                      if not specified the default value serde defined in the configuration will be used
-     * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, storeSupplier);
+                                            new ConsumedInternal<>(consumed),
+                                            new MaterializedInternal<>(
+                                                    Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
+                                                            internalStreamsBuilder.newStoreName(topic))
+                                                    .withKeySerde(consumed.keySerde)
+                                                    .withValueSerde(consumed.valueSerde),
+                                                    false));
     }
 
     /**
      * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
-     *                           committed offsets are available
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, null, offsetReset);
-        return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null);
-        return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} using the {@link Materialized} instance.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
-     *                           committed offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @param topic         the topic name; cannot be {@code null}
+     * @param materialized  the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
      * @return a {@link KTable} for the specified topic
      */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final TimestampExtractor timestampExtractor,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
+    public synchronized <K, V> KTable<K, V> table(final String topic,
+                                                  final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, offsetReset),
-                                            queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                           offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param storeSupplier      user defined state store supplier; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final TimestampExtractor timestampExtractor,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, storeSupplier);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
-                                                              final String queryableStoreName) {
-        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), queryableStoreName);
+                                            new ConsumedInternal<K, V>(),
+                                            new MaterializedInternal<>(materialized));
     }
 
     /**
@@ -741,7 +324,15 @@ public class StreamsBuilder {
      */
     public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
                                                               final Consumed<K, V> consumed) {
-        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), null);
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized =
+                new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
+                        internalStreamsBuilder.newStoreName(topic))
+                                                   .withKeySerde(consumed.keySerde)
+                                                   .withValueSerde(consumed.valueSerde),
+                                           false);
+        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized);
     }
 
     /**
@@ -761,61 +352,25 @@ public class StreamsBuilder {
      * @return a {@link GlobalKTable} for the specified topic
      */
     public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
-        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), null);
+        return globalTable(topic, Consumed.<K, V>with(null, null));
     }
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} and default key and value deserializers as specified in
-     * the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
      *
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                              final Serde<V> valueSerde,
-                                                              final TimestampExtractor timestampExtractor,
-                                                              final String topic,
-                                                              final String queryableStoreName) {
-        return internalStreamsBuilder.globalTable(topic,
-                                                  new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null),
-                                                  queryableStoreName);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      * Input {@link KeyValue} pairs with {@code null} key will be dropped.
      * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} configured with
+     * the provided instance of {@link Materialized}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
+     * You should only specify serdes in the {@link Consumed} instance as these will also be used to overwrite the
+     * serdes in {@link Materialized}, i.e.,
+     * <pre> {@code
+     * streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
+     * }
+     * </pre>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
@@ -827,28 +382,31 @@ public class StreamsBuilder {
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
      * regardless of the specified value in {@link StreamsConfig}.
      *
-     * @param keySerde      key serde used to send key-value pairs,
-     *                      if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde    value serde used to send key-value pairs,
-     *                      if not specified the default value serde defined in the configuration will be used
      * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier; Cannot be {@code null}
+     * @param consumed      the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param materialized   the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                              final Serde<V> valueSerde,
-                                                              final String topic,
-                                                              final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.globalTable(keySerde, valueSerde, topic, storeSupplier);
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                              final Consumed<K, V> consumed,
+                                                              final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        // always use the serdes from consumed
+        materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+        return internalStreamsBuilder.globalTable(topic,
+                                                  new ConsumedInternal<>(consumed),
+                                                  new MaterializedInternal<>(materialized));
     }
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
      * Input {@link KeyValue} pairs with {@code null} key will be dropped.
      * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} configured with
+     * the provided instance of {@link Materialized}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -863,34 +421,30 @@ public class StreamsBuilder {
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
      * regardless of the specified value in {@link StreamsConfig}.
      *
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @param topic         the topic name; cannot be {@code null}
+     * @param materialized   the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                              final Serde<V> valueSerde,
-                                                              final String topic,
-                                                              final String queryableStoreName) {
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                              final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
         return internalStreamsBuilder.globalTable(topic,
-                                                  new ConsumedInternal<>(Consumed.with(keySerde, valueSerde)),
-                                                  queryableStoreName);
+                                                  new ConsumedInternal<K, V>(),
+                                                  new MaterializedInternal<>(materialized));
     }
 
+
     /**
      * Adds a state store to the underlying {@link Topology}.
      *
-     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
-     * @param processorNames the names of the processors that should be able to access the provided store
+     * @param builder the builder used to obtain this state store {@link StateStore} instance
      * @return itself
      * @throws TopologyException if state store supplier is already added
      */
-    public synchronized StreamsBuilder addStateStore(final StateStoreSupplier supplier,
-                                                     final String... processorNames) {
-        internalStreamsBuilder.addStateStore(supplier, processorNames);
+    public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) {
+        Objects.requireNonNull(builder, "builder can't be null");
+        internalStreamsBuilder.addStateStore(builder);
         return this;
     }
 
@@ -907,62 +461,30 @@ public class StreamsBuilder {
      * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
-     * @param storeSupplier         user defined state store supplier
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
      * @param sourceName            name of the {@link SourceNode} that will be automatically added
-     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
      * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
      * @param processorName         the name of the {@link ProcessorSupplier}
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                                      final String sourceName,
-                                                      final Deserializer keyDeserializer,
-                                                      final Deserializer valueDeserializer,
+    @SuppressWarnings("unchecked")
+    public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
                                                       final String topic,
-                                                      final String processorName,
-                                                      final ProcessorSupplier stateUpdateSupplier) {
-        internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
-        return this;
-    }
-
-    /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
-     *
-     * @param storeSupplier         user defined state store supplier
-     * @param sourceName            name of the {@link SourceNode} that will be automatically added
-     * @param timestampExtractor    the stateless timestamp extractor used for this source,
-     *                              if not specified the default extractor defined in the configs will be used
-     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already registered
-     */
-    public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
                                                       final String sourceName,
-                                                      final TimestampExtractor timestampExtractor,
-                                                      final Deserializer keyDeserializer,
-                                                      final Deserializer valueDeserializer,
-                                                      final String topic,
+                                                      final Consumed consumed,
                                                       final String processorName,
                                                       final ProcessorSupplier stateUpdateSupplier) {
-        internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(storeBuilder,
+                                              sourceName,
+                                              topic,
+                                              new ConsumedInternal<>(consumed),
+                                              processorName,
+                                              stateUpdateSupplier);
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 386aacf..85d769f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -571,7 +571,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+    public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
                                                 final String sourceName,
                                                 final Deserializer keyDeserializer,
                                                 final Deserializer valueDeserializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4da9906..3963657 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -17,17 +17,15 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -72,37 +70,55 @@ public class InternalStreamsBuilder {
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
-    @SuppressWarnings("unchecked")
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
-                                     final String queryableStoreName) {
-        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-            consumed.keySerde(),
-            consumed.valueSerde(),
-            false,
-            Collections.<String, String>emptyMap(),
-            true);
-        return doTable(consumed, topic, storeSupplier, queryableStoreName != null);
-    }
-
-    public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return doTable(new ConsumedInternal<>(keySerde, valSerde, timestampExtractor, offsetReset), topic, storeSupplier, true);
+        final String source = newName(KStreamImpl.SOURCE_NAME);
+        final String name = newName(KTableImpl.SOURCE_NAME);
+
+        final KTable<K, V> kTable = createKTable(consumed,
+                                                 topic,
+                                                 storeSupplier.name(),
+                                                 true,
+                                                 source,
+                                                 name);
+
+        internalTopologyBuilder.addStateStore(storeSupplier, name);
+        internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
+
+        return kTable;
     }
 
-    private <K, V> KTable<K, V> doTable(final ConsumedInternal<K, V> consumed,
-                                        final String topic,
-                                        final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                        final boolean isQueryable) {
+    @SuppressWarnings("unchecked")
+    public <K, V> KTable<K, V> table(final String topic,
+                                     final ConsumedInternal<K, V> consumed,
+                                     final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
+
         final String source = newName(KStreamImpl.SOURCE_NAME);
         final String name = newName(KTableImpl.SOURCE_NAME);
-        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+        final KTable<K, V> kTable = createKTable(consumed,
+                                                 topic,
+                                                 storeBuilder.name(),
+                                                 materialized.isQueryable(),
+                                                 source,
+                                                 name);
+
+        internalTopologyBuilder.addStateStore(storeBuilder, name);
+        internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+
+        return kTable;
+    }
+
+
+    private <K, V> KTable<K, V> createKTable(final ConsumedInternal<K, V> consumed,
+                                             final String topic,
+                                             final String storeName,
+                                             final boolean isQueryable,
+                                             final String source,
+                                             final String name) {
+        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
 
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           source,
@@ -112,48 +128,27 @@ public class InternalStreamsBuilder {
                                           topic);
         internalTopologyBuilder.addProcessor(name, processorSupplier, source);
 
-        final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
-            consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeSupplier.name(), isQueryable);
-
-        internalTopologyBuilder.addStateStore(storeSupplier, name);
-        internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
-
-        return kTable;
+        return new KTableImpl<>(this, name, processorSupplier,
+                                consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
     }
 
     public <K, V> GlobalKTable<K, V> globalTable(final String topic,
                                                  final ConsumedInternal<K, V> consumed,
-                                                 final String queryableStoreName) {
-        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        return doGlobalTable(consumed, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-            consumed.keySerde(),
-            consumed.valueSerde(),
-            false,
-            Collections.<String, String>emptyMap(),
-            true));
-    }
-
-    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde,
-                                                 final String topic,
-                                                 final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return doGlobalTable(new ConsumedInternal<>(keySerde, valSerde, null, null), topic, storeSupplier);
-    }
-
-    @SuppressWarnings("unchecked")
-    private <K, V> GlobalKTable<K, V> doGlobalTable(final ConsumedInternal<K, V> consumed,
-                                                    final String topic,
-                                                    final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+                                                 final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        // explicitly disable logging for global stores
+        materialized.withLoggingDisabled();
+        final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
         final String sourceName = newName(KStreamImpl.SOURCE_NAME);
         final String processorName = newName(KTableImpl.SOURCE_NAME);
-        final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
+        final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
 
 
         final Deserializer<K> keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
         final Deserializer<V> valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
 
-        internalTopologyBuilder.addGlobalStore(storeSupplier,
+        internalTopologyBuilder.addGlobalStore(storeBuilder,
                                                sourceName,
                                                consumed.timestampExtractor(),
                                                keyDeserializer,
@@ -161,9 +156,10 @@ public class InternalStreamsBuilder {
                                                topic,
                                                processorName,
                                                tableSource);
-        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
     }
 
+
     public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
         return KStreamImpl.merge(this, streams);
     }
@@ -172,35 +168,31 @@ public class InternalStreamsBuilder {
         return prefix + String.format("%010d", index.getAndIncrement());
     }
 
-    String newStoreName(final String prefix) {
+    public String newStoreName(final String prefix) {
         return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
     }
 
-    public synchronized void addStateStore(final StateStoreSupplier supplier,
-                                           final String... processorNames) {
-        internalTopologyBuilder.addStateStore(supplier, processorNames);
-    }
-
-    public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                            final String sourceName,
-                                            final Deserializer keyDeserializer,
-                                            final Deserializer valueDeserializer,
-                                            final String topic,
-                                            final String processorName,
-                                            final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
+    public synchronized void addStateStore(final StoreBuilder builder) {
+        internalTopologyBuilder.addStateStore(builder);
     }
 
-    public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+    public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
                                             final String sourceName,
-                                            final TimestampExtractor timestampExtractor,
-                                            final Deserializer keyDeserializer,
-                                            final Deserializer valueDeserializer,
                                             final String topic,
+                                            final ConsumedInternal consumed,
                                             final String processorName,
                                             final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        // explicitly disable logging for global stores
+        storeBuilder.withLoggingDisabled();
+        final Deserializer keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
+        final Deserializer valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
+        internalTopologyBuilder.addGlobalStore(storeBuilder,
+                                               sourceName,
+                                               consumed.timestampExtractor(),
+                                               keyDeserializer,
+                                               valueDeserializer,
+                                               topic,
+                                               processorName,
+                                               stateUpdateSupplier);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 067bcfc..a42db0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -401,7 +402,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         return builder.table(topic,
                              new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
-                             internalStoreName);
+                             new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(internalStoreName)
+                                     .withKeySerde(keySerde)
+                                     .withValueSerde(valSerde),
+                                     queryableStoreName != null));
     }
 
     @Override
@@ -413,7 +417,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         to(keySerde, valSerde, partitioner, topic);
 
-        return builder.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, storeSupplier);
+        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(Consumed.with(keySerde, valSerde, new FailOnInvalidTimestamp(), null));
+        return builder.table(topic, consumed, storeSupplier);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 0ee610f..9f186fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -30,8 +30,9 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
     public MaterializedInternal(final Materialized<K, V, S> materialized) {
         this(materialized, true);
     }
-
-    MaterializedInternal(final Materialized<K, V, S> materialized, final boolean queryable) {
+    
+    public MaterializedInternal(final Materialized<K, V, S> materialized,
+                                final boolean queryable) {
         super(materialized);
         this.queryable = queryable;
     }
@@ -67,7 +68,7 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
         return cachingEnabled;
     }
 
-    public boolean isQueryable() {
+    boolean isQueryable() {
         return queryable;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 193d0e1..d47af88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
 import org.slf4j.Logger;
@@ -557,7 +556,7 @@ public class InternalTopologyBuilder {
     }
 
 
-    public final void addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+    public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
                                      final Deserializer keyDeserializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 73c5484..f1ae6da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -505,7 +505,8 @@ public class KafkaStreamsTest {
         CLUSTER.createTopic(topic);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        builder.table(Serdes.String(), Serdes.String(), topic, topic);
+        final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+        builder.table(topic, consumed);
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index e50f4d0..6c7b2b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -32,11 +32,11 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -583,13 +583,11 @@ public class EosIntegrationTest {
         String[] storeNames = null;
         if (withState) {
             storeNames = new String[] {storeName};
-            final StateStoreSupplier storeSupplier = Stores.create(storeName)
-                .withLongKeys()
-                .withLongValues()
-                .persistent()
-                .build();
+            final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder
+                    = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(), Serdes.Long())
+                    .withCachingEnabled();
 
-            builder.addStateStore(storeSupplier);
+            builder.addStateStore(storeBuilder);
         }
 
         final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);


Mime
View raw message