kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6036: Local Materialization for Source KTable (#5779)
Date Sun, 09 Dec 2018 06:49:57 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c0353d8  KAFKA-6036: Local Materialization for Source KTable (#5779)
c0353d8 is described below

commit c0353d8ddce88bac6fc04f85dd40cb95b8ca5cf9
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sat Dec 8 22:49:48 2018 -0800

    KAFKA-6036: Local Materialization for Source KTable (#5779)
    
    Refactor the materialization for source KTables in the way that:
    
    If Materialized.as(queryableName) is specified, materialize;
    If the downstream operator requires to fetch from this KTable via ValueGetters, materialize;
    If the downstream operator requires to send old values, materialize.
    Otherwise do not materialize the KTable. E.g. builder.table("topic").filter().toStream().to("topic") would not create any state stores.
    
    There's a couple of minor changes along with PR as well:
    
    KTableImpl's queryableStoreName and isQueryable are merged into queryableStoreName only, and if it is null it means not queryable. As long as it is not null, it should be queryable (i.e. internally generated names will not be used any more).
    To achieve this, splitted MaterializedInternal.storeName() and MaterializedInternal.queryableName(). The former can be internally generated and will not be exposed to users. QueryableName can be modified to set to the internal store name if we decide to materialize it during the DSL parsing / physical topology generation phase. And only if queryableName is specified the corresponding KTable is determined to be materialized.
    
    Found some overlapping unit tests among KTableImplTest, and KTableXXTest, removed them.
    
    There are a few typing bugs found along the way, fixed them as well.
    
    -----------------------
    
    This PR is an illustration of experimenting a poc towards logical materializations.
    
    Today we've logically materialized the KTable for filter / mapValues / transformValues if queryableName is not specified via Materialized, but whenever users specify queryableName we will still always materialize. My original goal is to also consider logically materialize for queryable stores, but when implementing it via a wrapped store to apply the transformations on the fly I realized it is tougher than I thought, because we not only need to support fetch or get, but also needs to  [...]
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <mjsax@apache.org>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |  22 +-
 .../kstream/internals/GlobalKTableImpl.java        |  18 +-
 .../internals/GroupedStreamAggregateBuilder.java   |   6 +-
 .../kstream/internals/InternalStreamsBuilder.java  |  78 +++--
 .../kstream/internals/KGroupedStreamImpl.java      |  16 +-
 .../kstream/internals/KGroupedTableImpl.java       |  15 +-
 .../streams/kstream/internals/KTableImpl.java      |  82 +++---
 .../streams/kstream/internals/KTableMapValues.java |   1 -
 .../streams/kstream/internals/KTableSource.java    |  44 ++-
 .../internals/KTableSourceValueGetterSupplier.java |   2 +-
 .../kstream/internals/MaterializedInternal.java    |  23 +-
 .../internals/SessionWindowedKStreamImpl.java      |  18 +-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  18 +-
 .../kstream/internals/graph/StreamSourceNode.java  |  10 +-
 .../internals/graph/TableProcessorNode.java        |  11 +-
 .../kstream/internals/graph/TableSourceNode.java   |  64 ++--
 .../internals/InternalTopologyBuilder.java         |   4 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   4 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |  38 +--
 .../org/apache/kafka/streams/TopologyTest.java     |  23 +-
 .../KTableSourceTopicRestartIntegrationTest.java   |   3 +-
 .../integration/RestoreIntegrationTest.java        |   2 +-
 .../internals/InternalStreamsBuilderTest.java      |  46 +--
 .../kstream/internals/KTableFilterTest.java        | 149 ++--------
 .../streams/kstream/internals/KTableImplTest.java  | 136 +--------
 .../internals/KTableKTableInnerJoinTest.java       | 324 +++++++++------------
 .../kstream/internals/KTableMapValuesTest.java     | 166 ++---------
 .../kstream/internals/KTableSourceTest.java        |   3 +-
 .../internals/MaterializedInternalTest.java        |   9 +-
 .../internals/GlobalStreamThreadTest.java          |  34 +--
 .../internals/KeyValueStoreMaterializerTest.java   |  21 +-
 .../processor/internals/StreamThreadTest.java      |   3 +-
 32 files changed, 494 insertions(+), 899 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 4c9ee93..9e99421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -225,8 +225,8 @@ public class StreamsBuilder {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
         return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
     }
 
@@ -275,8 +275,7 @@ public class StreamsBuilder {
         Objects.requireNonNull(consumed, "consumed can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()));
-        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+            new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-");
         return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
     }
 
@@ -301,8 +300,8 @@ public class StreamsBuilder {
                                                   final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
         final ConsumedInternal<K, V> consumedInternal =
                 new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde()));
 
@@ -331,8 +330,7 @@ public class StreamsBuilder {
         Objects.requireNonNull(consumed, "consumed can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()));
-        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+                new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-");
 
         return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
     }
@@ -398,8 +396,8 @@ public class StreamsBuilder {
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         // always use the serdes from consumed
         materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
 
         return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
     }
@@ -433,8 +431,8 @@ public class StreamsBuilder {
                                                               final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
 
         return internalStreamsBuilder.globalTable(topic,
                                                   new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
index 8fcdfed..734ff4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
@@ -21,17 +21,12 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 public class GlobalKTableImpl<K, V> implements GlobalKTable<K, V> {
 
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
-    private final boolean queryable;
+    private final String queryableStoreName;
 
-    public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier) {
+    GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
+                     final String queryableStoreName) {
         this.valueGetterSupplier = valueGetterSupplier;
-        this.queryable = true;
-    }
-    
-    public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier, 
-                            final boolean queryable) {
-        this.valueGetterSupplier = valueGetterSupplier;
-        this.queryable = queryable;
+        this.queryableStoreName = queryableStoreName;
     }
 
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
@@ -40,10 +35,7 @@ public class GlobalKTableImpl<K, V> implements GlobalKTable<K, V> {
 
     @Override
     public String queryableStoreName() {
-        if (!queryable) {
-            return null;
-        }
-        return valueGetterSupplier.storeNames()[0];
+        return queryableStoreName;
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index d410bce..bb93a4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -67,9 +67,10 @@ class GroupedStreamAggregateBuilder<K, V> {
     <KR, VR> KTable<KR, VR> build(final String functionName,
                                   final StoreBuilder<? extends StateStore> storeBuilder,
                                   final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
-                                  final boolean isQueryable,
+                                  final String queryableStoreName,
                                   final Serde<KR> keySerde,
                                   final Serde<VR> valSerde) {
+        assert queryableStoreName == null || queryableStoreName.equals(storeBuilder.name());
 
         final String aggFunctionName = builder.newProcessorName(functionName);
 
@@ -99,8 +100,7 @@ class GroupedStreamAggregateBuilder<K, V> {
                                 keySerde,
                                 valSerde,
                                 sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
-                                storeBuilder.name(),
-                                isQueryable,
+                                queryableStoreName,
                                 aggregateSupplier,
                                 statefulProcessorNode,
                                 builder);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 20df084..e0983eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -109,67 +108,62 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                  this);
     }
 
-    @SuppressWarnings("unchecked")
-    public <K, V, S extends StateStore> KTable<K, V> table(final String topic,
-                                                           final ConsumedInternal<K, V> consumed,
-                                                           final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
-
-        final StoreBuilder<S> storeBuilder = (StoreBuilder<S>) new KeyValueStoreMaterializer<>(materialized).materialize();
-
-        final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
-        final String name = newProcessorName(KTableImpl.SOURCE_NAME);
-        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeBuilder.name());
-        final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
-
-        final TableSourceNode.TableSourceNodeBuilder<K, V, S> tableSourceNodeBuilder = TableSourceNode.tableSourceNodeBuilder();
-
-        final TableSourceNode<K, V, S> tableSourceNode = tableSourceNodeBuilder.withNodeName(name)
-                                                                               .withSourceName(source)
-                                                                               .withStoreBuilder(storeBuilder)
-                                                                               .withConsumedInternal(consumed)
-                                                                               .withProcessorParameters(processorParameters)
-                                                                               .withTopic(topic)
-                                                                               .build();
+    public <K, V> KTable<K, V> table(final String topic,
+                                     final ConsumedInternal<K, V> consumed,
+                                     final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String tableSourceName = newProcessorName(KTableImpl.SOURCE_NAME);
+        final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
+        final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);
+
+        final TableSourceNode<K, V> tableSourceNode = TableSourceNode.<K, V>tableSourceNodeBuilder()
+            .withTopic(topic)
+            .withSourceName(sourceName)
+            .withNodeName(tableSourceName)
+            .withConsumedInternal(consumed)
+            .withMaterializedInternal(materialized)
+            .withProcessorParameters(processorParameters)
+            .build();
 
         addGraphNode(root, tableSourceNode);
 
-        return new KTableImpl<>(name,
+        return new KTableImpl<>(tableSourceName,
                                 consumed.keySerde(),
                                 consumed.valueSerde(),
-                                Collections.singleton(source),
-                                storeBuilder.name(),
-                                materialized.isQueryable(),
-                                processorSupplier,
+                                Collections.singleton(sourceName),
+                                materialized.queryableStoreName(),
+                                tableSource,
                                 tableSourceNode,
                                 this);
     }
 
-    @SuppressWarnings("unchecked")
-    public <K, V, S extends StateStore> GlobalKTable<K, V> globalTable(final String topic,
-                                                                       final ConsumedInternal<K, V> consumed,
-                                                                       final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+    public <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                 final ConsumedInternal<K, V> consumed,
+                                                 final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(consumed, "consumed can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         // explicitly disable logging for global stores
         materialized.withLoggingDisabled();
-        final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
         final String sourceName = newProcessorName(KTableImpl.SOURCE_NAME);
         final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
-        final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
+        // enforce store name as queryable name to always materialize global table stores
+        final String storeName = materialized.storeName();
+        final KTableSource<K, V> tableSource = new KTableSource<>(storeName, storeName);
 
-        final ProcessorParameters processorParameters = new ProcessorParameters(tableSource, processorName);
+        final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, processorName);
 
-        final TableSourceNode<K, V, S> tableSourceNode = TableSourceNode.tableSourceNodeBuilder().withStoreBuilder(storeBuilder)
-                                                                                                 .withSourceName(sourceName)
-                                                                                                 .withConsumedInternal(consumed)
-                                                                                                 .withTopic(topic)
-                                                                                                 .withProcessorParameters(processorParameters)
-                                                                                                 .isGlobalKTable(true)
-                                                                                                 .build();
+        final TableSourceNode<K, V> tableSourceNode = TableSourceNode.<K, V>tableSourceNodeBuilder()
+            .withTopic(topic)
+            .isGlobalKTable(true)
+            .withSourceName(sourceName)
+            .withConsumedInternal(consumed)
+            .withMaterializedInternal(materialized)
+            .withProcessorParameters(processorParameters)
+            .build();
 
         addGraphNode(root, tableSourceNode);
 
-        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<>(storeBuilder.name()), materialized.isQueryable());
+        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<>(storeName), materialized.queryableStoreName());
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index e53e37f..eab1e8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -40,7 +40,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
     static final String REDUCE_NAME = "KSTREAM-REDUCE-";
     static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
 
-    private final boolean repartitionRequired;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
     KGroupedStreamImpl(final String name,
@@ -58,7 +57,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
             name,
             streamsGraphNode
         );
-        this.repartitionRequired = repartitionRequired;
     }
 
     @Override
@@ -72,8 +70,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -97,8 +95,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -136,8 +134,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
     }
 
     private KTable<K, Long> doCount(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
-        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -189,7 +187,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
             functionName,
             new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
             aggregateSupplier,
-            materializedInternal.isQueryable(),
+            materializedInternal.queryableStoreName(),
             materializedInternal.keySerde(),
             materializedInternal.valueSerde());
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 29a52b1..2eca84e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -94,8 +94,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
                                 materialized.keySerde(),
                                 materialized.valueSerde(),
                                 Collections.singleton(sourceName),
-                                materialized.storeName(),
-                                materialized.isQueryable(),
+                                materialized.queryableStoreName(),
                                 aggregateSupplier,
                                 statefulProcessorNode,
                                 builder);
@@ -121,8 +120,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -144,8 +143,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
 
     @Override
     public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
-        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -177,8 +176,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9caac30..d4c1baf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -86,7 +86,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     private final ProcessorSupplier<?, ?> processorSupplier;
 
     private final String queryableStoreName;
-    private final boolean isQueryable;
 
     private boolean sendOldValues = false;
 
@@ -95,39 +94,33 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                       final Serde<V> valSerde,
                       final Set<String> sourceNodes,
                       final String queryableStoreName,
-                      final boolean isQueryable,
                       final ProcessorSupplier<?, ?> processorSupplier,
                       final StreamsGraphNode streamsGraphNode,
                       final InternalStreamsBuilder builder) {
         super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
         this.processorSupplier = processorSupplier;
         this.queryableStoreName = queryableStoreName;
-        this.isQueryable = isQueryable;
     }
 
     @Override
     public String queryableStoreName() {
-        if (!isQueryable) {
-            return null;
-        } else {
-            return this.queryableStoreName;
-        }
+        return queryableStoreName;
     }
 
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
                                   final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal,
                                   final boolean filterNot) {
-        final String name = builder.newProcessorName(FILTER_NAME);
+        // we actually do not need to generate store names at all since if it is not specified, we will not
+        // materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility.
+        if (materializedInternal != null && materializedInternal.storeName() == null) {
+            builder.newStoreName(FILTER_NAME);
+        }
 
-        // only materialize if the state store is queryable
-        final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
+        final String name = builder.newProcessorName(FILTER_NAME);
 
-        final KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(
-            this,
-            predicate,
-            filterNot,
-            shouldMaterialize ? materializedInternal.storeName() : null
-        );
+        // only materialize if the state store has queryable name
+        final String queryableName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
+        final KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableName);
 
         final ProcessorParameters<K, V> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
             new ProcessorParameters<>(processorSupplier, name)
@@ -149,8 +142,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                 materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
                                 materializedInternal != null && materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : valSerde,
                                 sourceNodes,
-                                shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
-                                shouldMaterialize,
+                                queryableName,
                                 processorSupplier,
                                 tableNode,
                                 builder);
@@ -168,7 +160,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         Objects.requireNonNull(predicate, "predicate can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, FILTER_NAME);
 
         return doFilter(predicate, materializedInternal, false);
     }
@@ -185,23 +176,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         Objects.requireNonNull(predicate, "predicate can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, FILTER_NAME);
 
         return doFilter(predicate, materializedInternal, true);
     }
 
     private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
                                            final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
-        final String name = builder.newProcessorName(MAPVALUES_NAME);
+        // we actually do not need generate store names at all since if it is not specified, we will not
+        // materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility.
+        if (materializedInternal != null && materializedInternal.storeName() == null) {
+            builder.newStoreName(MAPVALUES_NAME);
+        }
 
-        // only materialize if the state store is queryable
-        final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
+        final String name = builder.newProcessorName(MAPVALUES_NAME);
 
-        final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(
-            this,
-            mapper,
-            shouldMaterialize ? materializedInternal.storeName() : null
-        );
+        // only materialize if the state store has queryable name
+        final String queryableName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
+        final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableName);
 
         // leaving in calls to ITB until building topology with graph
 
@@ -225,8 +216,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
             materializedInternal != null ? materializedInternal.valueSerde() : null,
             sourceNodes,
-            shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
-            shouldMaterialize,
+            queryableName,
             processorSupplier,
             tableNode,
             builder
@@ -252,7 +242,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         Objects.requireNonNull(materialized, "materialized can't be null");
 
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, MAPVALUES_NAME);
 
         return doMapValues(withKey(mapper), materializedInternal);
     }
@@ -264,8 +253,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         Objects.requireNonNull(materialized, "materialized can't be null");
 
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, MAPVALUES_NAME);
-
         return doMapValues(mapper, materializedInternal);
     }
 
@@ -281,7 +268,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                               final String... stateStoreNames) {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, TRANSFORMVALUES_NAME);
 
         return doTransformValues(transformerSupplier, materializedInternal, stateStoreNames);
     }
@@ -293,12 +279,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
 
         final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
-        final boolean shouldMaterialize = materialized != null && materialized.isQueryable();
+        // only materialize if users provide a specific queryable name
+        final String queryableStoreName = materialized != null ? materialized.queryableStoreName() : null;
 
         final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(
             this,
             transformerSupplier,
-            shouldMaterialize ? materialized.storeName() : null);
+            queryableStoreName);
 
         final ProcessorParameters<K, VR> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
             new ProcessorParameters<>(processorSupplier, name)
@@ -321,8 +308,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             materialized != null && materialized.keySerde() != null ? materialized.keySerde() : keySerde,
             materialized != null ? materialized.valueSerde() : null,
             sourceNodes,
-            shouldMaterialize ? materialized.storeName() : this.queryableStoreName,
-            shouldMaterialize,
+            queryableStoreName,
             processorSupplier,
             tableNode,
             builder);
@@ -388,7 +374,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             valSerde,
             Collections.singleton(this.name),
             null,
-            false,
             suppressionSupplier,
             node,
             builder
@@ -424,8 +409,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                        final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, MERGE_NAME);
 
         return doJoin(other, joiner, materializedInternal, false, false);
     }
@@ -441,8 +426,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, MERGE_NAME);
 
         return doJoin(other, joiner, materializedInternal, true, true);
     }
@@ -458,8 +443,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, MERGE_NAME);
 
         return doJoin(other, joiner, materializedInternal, true, false);
     }
@@ -552,7 +537,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             materializedInternal != null ? materializedInternal.valueSerde() : null,
             allSourceNodes,
             internalQueryableName,
-            internalQueryableName != null,
             joinMerge,
             kTableKTableJoinNode,
             builder
@@ -604,7 +588,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         if (processorSupplier instanceof KTableSource) {
             final KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
-            return new KTableSourceValueGetterSupplier<>(source.storeName);
+            // whenever a source ktable is required for getter, it should be materialized
+            source.materialize();
+            return new KTableSourceValueGetterSupplier<>(source.queryableName());
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
         } else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 9ae2b65..c2c84d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -132,5 +132,4 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
             parentGetter.close();
         }
     }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 85ceaae..274d96e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -25,15 +25,25 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
+
 public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
     private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
 
-    public final String storeName;
+    private final String storeName;
+    private String queryableName;
+    private boolean sendOldValues;
 
-    private boolean sendOldValues = false;
+    public KTableSource(final String storeName, final String queryableName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
 
-    public KTableSource(final String storeName) {
         this.storeName = storeName;
+        this.queryableName = queryableName;
+        this.sendOldValues = false;
+    }
+
+    public String queryableName() {
+        return queryableName;
     }
 
     @Override
@@ -41,8 +51,17 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
         return new KTableSourceProcessor();
     }
 
+    // when source ktable requires sending old values, we just
+    // need to set the queryable name as the store name to enforce materialization
     public void enableSendingOldValues() {
-        sendOldValues = true;
+        this.sendOldValues = true;
+        this.queryableName = storeName;
+    }
+
+    // when the source ktable requires materialization from downstream, we just
+    // need to set the queryable name as the store name to enforce materialization
+    public void materialize() {
+        this.queryableName = storeName;
     }
 
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
@@ -56,8 +75,10 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            if (queryableName != null) {
+                store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
+                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            }
         }
 
         @Override
@@ -71,9 +92,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
                 metrics.skippedRecordsSensor().record();
                 return;
             }
-            final V oldValue = store.get(key);
-            store.put(key, value);
-            tupleForwarder.maybeForward(key, value, oldValue);
+
+            if (queryableName != null) {
+                final V oldValue = sendOldValues ? store.get(key) : null;
+                store.put(key, value);
+                tupleForwarder.maybeForward(key, value, oldValue);
+            } else {
+                context().forward(key, new Change<>(value, null));
+            }
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index a5f68c3..6882dac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -23,7 +23,7 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
 
     private final String storeName;
 
-    public KTableSourceValueGetterSupplier(final String storeName) {
+    KTableSourceValueGetterSupplier(final String storeName) {
         this.storeName = storeName;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 1955dea..3948705 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -29,17 +29,26 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
     private final boolean queriable;
 
     public MaterializedInternal(final Materialized<K, V, S> materialized) {
-        super(materialized);
-        queriable = storeName() != null;
+        this(materialized, null, null);
     }
 
-    public void generateStoreNameIfNeeded(final InternalNameProvider nameProvider,
-                                          final String generatedStorePrefix) {
-        if (storeName() == null) {
+    public MaterializedInternal(final Materialized<K, V, S> materialized,
+                                final InternalNameProvider nameProvider,
+                                final String generatedStorePrefix) {
+        super(materialized);
+
+        // if storeName is not provided, the corresponding KTable would never be queryable;
+        // but we still need to provide an internal name for it in case we materialize.
+        queriable = storeName() != null;
+        if (!queriable && nameProvider != null) {
             storeName = nameProvider.newStoreName(generatedStorePrefix);
         }
     }
 
+    public String queryableStoreName() {
+        return queriable ? storeName() : null;
+    }
+
     public String storeName() {
         if (storeSupplier != null) {
             return storeSupplier.name();
@@ -71,10 +80,6 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
         return cachingEnabled;
     }
 
-    public boolean isQueryable() {
-        return queriable;
-    }
-
     Duration retention() {
         return retention;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index e185f4d..8c731be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -79,8 +79,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
     }
 
     private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
-        final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -97,7 +97,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
                 aggregateBuilder.countInitializer,
                 aggregateBuilder.countAggregator,
                 countMerger),
-            materializedInternal.isQueryable(),
+            materializedInternal.queryableStoreName(),
             materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
             materializedInternal.valueSerde());
     }
@@ -113,8 +113,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final Aggregator<K, V, V> reduceAggregator = aggregatorForReducer(reducer);
-        final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
+        final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -132,7 +132,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
                 reduceAggregator,
                 mergerForAggregator(reduceAggregator)
             ),
-            materializedInternal.isQueryable(),
+            materializedInternal.queryableStoreName(),
             materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
             materializedInternal.valueSerde());
     }
@@ -153,8 +153,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -169,7 +169,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
                 initializer,
                 aggregator,
                 sessionMerger),
-            materializedInternal.isQueryable(),
+            materializedInternal.queryableStoreName(),
             materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
             materializedInternal.valueSerde());
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index dfead3e..c805879 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -78,8 +78,8 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
     }
 
     private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
-        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -92,7 +92,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
             AGGREGATE_NAME,
             materialize(materializedInternal),
             new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
-            materializedInternal.isQueryable(),
+            materializedInternal.queryableStoreName(),
             materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
@@ -110,8 +110,8 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -119,7 +119,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
             AGGREGATE_NAME,
             materialize(materializedInternal),
             new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
-            materializedInternal.isQueryable(),
+            materializedInternal.queryableStoreName(),
             materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
@@ -134,8 +134,8 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -148,7 +148,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
             REDUCE_NAME,
             materialize(materializedInternal),
             new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
-            materializedInternal.isQueryable(),
+            materializedInternal.queryableStoreName(),
             materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index 29b6ecf..7d50c2a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -35,19 +35,17 @@ public class StreamSourceNode<K, V> extends StreamsGraphNode {
     public StreamSourceNode(final String nodeName,
                             final Collection<String> topicNames,
                             final ConsumedInternal<K, V> consumedInternal) {
-        super(nodeName,
-              false);
+        super(nodeName, false);
 
         this.topicNames = topicNames;
         this.consumedInternal = consumedInternal;
     }
 
     public StreamSourceNode(final String nodeName,
-                     final Pattern topicPattern,
-                     final ConsumedInternal<K, V> consumedInternal) {
+                            final Pattern topicPattern,
+                            final ConsumedInternal<K, V> consumedInternal) {
 
-        super(nodeName,
-              false);
+        super(nodeName, false);
 
         this.topicPattern = topicPattern;
         this.consumedInternal = consumedInternal;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index 0e6435a..eb3328a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -37,8 +37,7 @@ public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraph
                               final MaterializedInternal<K, V, S> materializedInternal,
                               final String[] storeNames) {
 
-        super(nodeName,
-              false);
+        super(nodeName, false);
         this.processorParameters = processorParameters;
         this.materializedInternal = materializedInternal;
         this.storeNames = storeNames != null ? storeNames : new String[]{};
@@ -56,18 +55,18 @@ public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraph
     @SuppressWarnings("unchecked")
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
         final String processorName = processorParameters.processorName();
-
         topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames());
 
         if (storeNames.length > 0) {
             topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
         }
 
+        // only materialize if materialized is specified and it is queryable
+        final boolean shouldMaterialize = materializedInternal != null && materializedInternal.queryableStoreName() != null;
         if (shouldMaterialize) {
-            topologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(
-                    (MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize(), processorName);
+            // TODO: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
+            topologyBuilder.addStateStore(new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize(), processorName);
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 90fb5d4..6c213a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -17,8 +17,11 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.kstream.internals.KTableSource;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -29,9 +32,9 @@ import java.util.Collections;
  * Used to represent either a KTable source or a GlobalKTable source. A boolean flag is used to indicate if this represents a GlobalKTable a {@link
  * org.apache.kafka.streams.kstream.GlobalKTable}
  */
-public class TableSourceNode<K, V, S extends StateStore> extends StreamSourceNode<K, V> {
+public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
 
-    private final StoreBuilder<S> storeBuilder;
+    private final MaterializedInternal<K, V, ?> materializedInternal;
     private final ProcessorParameters<K, V> processorParameters;
     private final String sourceName;
     private final boolean isGlobalKTable;
@@ -40,7 +43,7 @@ public class TableSourceNode<K, V, S extends StateStore> extends StreamSourceNod
                     final String sourceName,
                     final String topic,
                     final ConsumedInternal<K, V> consumedInternal,
-                    final StoreBuilder<S> storeBuilder,
+                    final MaterializedInternal<K, V, ?> materializedInternal,
                     final ProcessorParameters<K, V> processorParameters,
                     final boolean isGlobalKTable) {
 
@@ -48,27 +51,23 @@ public class TableSourceNode<K, V, S extends StateStore> extends StreamSourceNod
               Collections.singletonList(topic),
               consumedInternal);
 
-        this.processorParameters = processorParameters;
         this.sourceName = sourceName;
         this.isGlobalKTable = isGlobalKTable;
-        this.storeBuilder = storeBuilder;
-    }
-
-    public boolean isGlobalKTable() {
-        return isGlobalKTable;
+        this.processorParameters = processorParameters;
+        this.materializedInternal = materializedInternal;
     }
 
     @Override
     public String toString() {
         return "TableSourceNode{" +
-               "storeBuilder=" + storeBuilder +
+               "materializedInternal=" + materializedInternal +
                ", processorParameters=" + processorParameters +
                ", sourceName='" + sourceName + '\'' +
                ", isGlobalKTable=" + isGlobalKTable +
                "} " + super.toString();
     }
 
-    public static <K, V, S extends StateStore> TableSourceNodeBuilder<K, V, S> tableSourceNodeBuilder() {
+    public static <K, V> TableSourceNodeBuilder<K, V> tableSourceNodeBuilder() {
         return new TableSourceNodeBuilder<>();
     }
 
@@ -77,8 +76,13 @@ public class TableSourceNode<K, V, S extends StateStore> extends StreamSourceNod
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         final String topicName = getTopicNames().iterator().next();
 
+        // TODO: we assume source KTables can only be key-value stores for now.
+        // should be expanded for other types of stores as well.
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder =
+            new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
+
         if (isGlobalKTable) {
-            topologyBuilder.addGlobalStore((StoreBuilder<KeyValueStore>) storeBuilder,
+            topologyBuilder.addGlobalStore(storeBuilder,
                                            sourceName,
                                            consumedInternal().timestampExtractor(),
                                            consumedInternal().keyDeserializer(),
@@ -96,70 +100,72 @@ public class TableSourceNode<K, V, S extends StateStore> extends StreamSourceNod
 
             topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName);
 
-            topologyBuilder.addStateStore(storeBuilder, nodeName());
-            topologyBuilder.markSourceStoreAndTopic(storeBuilder, topicName);
+            // only add state store if the source KTable should be materialized
+            final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
+            if (ktableSource.queryableName() != null) {
+                topologyBuilder.addStateStore(storeBuilder, nodeName());
+                topologyBuilder.markSourceStoreAndTopic(storeBuilder, topicName);
+            }
         }
 
     }
 
-    public static final class TableSourceNodeBuilder<K, V, S extends StateStore> {
+    public static final class TableSourceNodeBuilder<K, V> {
 
         private String nodeName;
         private String sourceName;
         private String topic;
         private ConsumedInternal<K, V> consumedInternal;
-        private StoreBuilder<S> storeBuilder;
+        private MaterializedInternal<K, V, ?> materializedInternal;
         private ProcessorParameters<K, V> processorParameters;
         private boolean isGlobalKTable = false;
 
         private TableSourceNodeBuilder() {
         }
 
-        public TableSourceNodeBuilder<K, V, S> withSourceName(final String sourceName) {
+        public TableSourceNodeBuilder<K, V> withSourceName(final String sourceName) {
             this.sourceName = sourceName;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V, S> withTopic(final String topic) {
+        public TableSourceNodeBuilder<K, V> withTopic(final String topic) {
             this.topic = topic;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V, S> withStoreBuilder(final StoreBuilder<S> storeBuilder) {
-            this.storeBuilder = storeBuilder;
+        public TableSourceNodeBuilder<K, V> withMaterializedInternal(final MaterializedInternal<K, V, ?> materializedInternal) {
+            this.materializedInternal = materializedInternal;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V, S> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) {
+        public TableSourceNodeBuilder<K, V> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) {
             this.consumedInternal = consumedInternal;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V, S> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
+        public TableSourceNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
             this.processorParameters = processorParameters;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V, S> withNodeName(final String nodeName) {
+        public TableSourceNodeBuilder<K, V> withNodeName(final String nodeName) {
             this.nodeName = nodeName;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V, S> isGlobalKTable(final boolean isGlobaKTable) {
+        public TableSourceNodeBuilder<K, V> isGlobalKTable(final boolean isGlobaKTable) {
             this.isGlobalKTable = isGlobaKTable;
             return this;
         }
 
-        public TableSourceNode<K, V, S> build() {
+        public TableSourceNode<K, V> build() {
             return new TableSourceNode<>(nodeName,
                                          sourceName,
                                          topic,
                                          consumedInternal,
-                                         storeBuilder,
+                                         materializedInternal,
                                          processorParameters,
                                          isGlobalKTable);
-
-
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 938ed91..b1f9496 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@@ -155,7 +154,6 @@ public class InternalTopologyBuilder {
             return users;
         }
 
-        /** Visible for testing */
         public boolean loggingEnabled() {
             return builder.loggingEnabled();
         }
@@ -538,7 +536,7 @@ public class InternalTopologyBuilder {
         nodeGroups = null;
     }
 
-    public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
+    public final void addGlobalStore(final StoreBuilder storeBuilder,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
                                      final Deserializer keyDeserializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 3ca0dcf..d2a4ace 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
@@ -540,8 +541,7 @@ public class KafkaStreamsTest {
         CLUSTER.createTopic(topic);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
-        builder.table(topic, consumed);
+        builder.table(topic, Materialized.as("store"));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 331323d..894b561 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -40,7 +40,6 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
@@ -49,7 +48,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 public class StreamsBuilderTest {
 
@@ -66,7 +64,7 @@ public class StreamsBuilderTest {
 
     @Test
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
-        final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes, String>allGoodPredicate());
+        final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.allGoodPredicate());
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
@@ -80,20 +78,20 @@ public class StreamsBuilderTest {
     @Test
     public void shouldAllowJoinMaterializedFilteredKTable() {
         final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic")
-                .filter(MockPredicate.<Bytes, String>allGoodPredicate(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
+                .filter(MockPredicate.allGoodPredicate(), Materialized.as("store"));
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
-        assertThat(topology.stateStores().size(), equalTo(2));
+        assertThat(topology.stateStores().size(), equalTo(1));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
         assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), equalTo(Collections.singleton("store")));
     }
 
     @Test
     public void shouldAllowJoinUnmaterializedMapValuedKTable() {
-        final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.<String>noOpValueMapper());
+        final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.noOpValueMapper());
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
@@ -107,13 +105,13 @@ public class StreamsBuilderTest {
     @Test
     public void shouldAllowJoinMaterializedMapValuedKTable() {
         final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic")
-                .mapValues(MockMapper.<String>noOpValueMapper(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
+                .mapValues(MockMapper.noOpValueMapper(), Materialized.as("store"));
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
-        assertThat(topology.stateStores().size(), equalTo(2));
+        assertThat(topology.stateStores().size(), equalTo(1));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
         assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"), equalTo(Collections.singleton("store")));
     }
@@ -267,25 +265,13 @@ public class StreamsBuilderTest {
     }
 
     @Test
-    public void shouldUseDefaultNodeAndStoreNames() {
+    public void shouldNotMaterializeStoresIfNotRequired() {
         final String topic = "topic";
-        builder.table(topic,
-                Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>with(Serdes.Long(), Serdes.String()));
-
-        final Iterator<TopologyDescription.Subtopology> subtopologies = builder.build().describe().subtopologies().iterator();
-        final TopologyDescription.Subtopology subtopology = subtopologies.next();
-
-        final Iterator<TopologyDescription.Node> nodes = subtopology.nodes().iterator();
-        TopologyDescription.Node node = nodes.next();
-        assertThat(node.name(), equalTo("KSTREAM-SOURCE-0000000001"));
-        node = nodes.next();
-        assertThat(node.name(), equalTo("KTABLE-SOURCE-0000000002"));
-        final Iterator<String> stores = ((TopologyDescription.Processor) node).stores().iterator();
-        assertThat(stores.next(), equalTo(topic + "-STATE-STORE-0000000000"));
-
-        assertFalse(nodes.hasNext());
-        assertFalse(stores.hasNext());
-        assertFalse(subtopologies.hasNext());
+        builder.table(topic, Materialized.with(Serdes.Long(), Serdes.String()));
+
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertThat(topology.stateStores().size(), equalTo(0));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 7a58e1f..046ffb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -860,6 +860,7 @@ public class TopologyTest {
             .groupBy((key, value) -> null)
             .count();
         final TopologyDescription describe = builder.build().describe();
+
         assertEquals(
             "Topologies:\n" +
                 "   Sub-topology: 0\n" +
@@ -955,13 +956,12 @@ public class TopologyTest {
         final KTable<Object, Object> table = builder.table("input-topic");
         table.mapValues((readOnlyKey, value) -> null);
         final TopologyDescription describe = builder.build().describe();
-        System.out.println(describe);
         Assert.assertEquals(
             "Topologies:\n" +
                 "   Sub-topology: 0\n" +
                 "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
                 "      --> KTABLE-SOURCE-0000000002\n" +
-                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n" +
                 "      --> KTABLE-MAPVALUES-0000000003\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
                 "    Processor: KTABLE-MAPVALUES-0000000003 (stores: [])\n" +
@@ -983,7 +983,7 @@ public class TopologyTest {
                 "   Sub-topology: 0\n" +
                 "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
                 "      --> KTABLE-SOURCE-0000000002\n" +
-                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n" +
                 "      --> KTABLE-MAPVALUES-0000000004\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
                 // previously, this was
@@ -1009,7 +1009,7 @@ public class TopologyTest {
                 "   Sub-topology: 0\n" +
                 "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
                 "      --> KTABLE-SOURCE-0000000002\n" +
-                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n" +
                 "      --> KTABLE-MAPVALUES-0000000003\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
                 "    Processor: KTABLE-MAPVALUES-0000000003 (stores: [store-name])\n" +
@@ -1030,7 +1030,7 @@ public class TopologyTest {
                 "   Sub-topology: 0\n" +
                 "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
                 "      --> KTABLE-SOURCE-0000000002\n" +
-                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n" +
                 "      --> KTABLE-FILTER-0000000003\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
                 "    Processor: KTABLE-FILTER-0000000003 (stores: [])\n" +
@@ -1043,16 +1043,14 @@ public class TopologyTest {
     public void kTableAnonymousMaterializedFilterShouldPreserveTopologyStructure() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Object, Object> table = builder.table("input-topic");
-        table.filter(
-            (key, value) -> false,
-            Materialized.with(null, null));
+        table.filter((key, value) -> false, Materialized.with(null, null));
         final TopologyDescription describe = builder.build().describe();
         Assert.assertEquals(
             "Topologies:\n" +
                 "   Sub-topology: 0\n" +
                 "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
                 "      --> KTABLE-SOURCE-0000000002\n" +
-                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n" +
                 "      --> KTABLE-FILTER-0000000004\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
                 // Previously, this was
@@ -1069,16 +1067,15 @@ public class TopologyTest {
     public void kTableNamedMaterializedFilterShouldPreserveTopologyStructure() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Object, Object> table = builder.table("input-topic");
-        table.filter(
-            (key, value) -> false,
-            Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as("store-name").withKeySerde(null).withValueSerde(null));
+        table.filter((key, value) -> false, Materialized.as("store-name"));
         final TopologyDescription describe = builder.build().describe();
+
         Assert.assertEquals(
             "Topologies:\n" +
                 "   Sub-topology: 0\n" +
                 "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
                 "      --> KTABLE-SOURCE-0000000002\n" +
-                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n" +
                 "      --> KTABLE-FILTER-0000000003\n" +
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
                 "    Processor: KTABLE-FILTER-0000000003 (stores: [store-name])\n" +
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index d5781dd..1707679 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
 import org.apache.kafka.test.IntegrationTest;
@@ -93,7 +94,7 @@ public class KTableSourceTopicRestartIntegrationTest {
 
     @Before
     public void before() {
-        final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC);
+        final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC, Materialized.as("store"));
         kTable.toStream().foreach(new ForeachAction<String, String>() {
             @Override
             public void apply(final String key, final String value) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index c3c45db..31a349b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -141,7 +141,7 @@ public class RestoreIntegrationTest {
         final CountDownLatch startupLatch = new CountDownLatch(1);
         final CountDownLatch shutdownLatch = new CountDownLatch(1);
 
-        builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+        builder.table(INPUT_STREAM, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("store").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Integer()))
                 .toStream()
                 .foreach(new ForeachAction<Integer, Integer>() {
                     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 7756846..8c14d17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -61,11 +61,7 @@ public class InternalStreamsBuilderTest {
     private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
     private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
     private final String storePrefix = "prefix-";
-    private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"));
-
-    {
-        materialized.generateStoreNameIfNeeded(builder, storePrefix);
-    }
+    private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
 
     @Test
     public void testNewName() {
@@ -128,10 +124,9 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() {
+    public void shouldNotMaterializeSourceKTableIfNotRequired() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(Materialized.with(null, null));
-        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+            new MaterializedInternal<>(Materialized.with(null, null), builder, storePrefix);
         final KTable table1 = builder.table("topic2", consumed, materializedInternal);
 
         builder.buildAndOptimizeTopology();
@@ -139,20 +134,15 @@ public class InternalStreamsBuilderTest {
             .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
             .build(null);
 
-        assertEquals(1, topology.stateStores().size());
-        final String storeName = "prefix-STATE-STORE-0000000000";
-        assertEquals(storeName, topology.stateStores().get(0).name());
-
-        assertEquals(1, topology.storeToChangelogTopic().size());
-        assertEquals("app-id-prefix-STATE-STORE-0000000000-changelog", topology.storeToChangelogTopic().get(storeName));
+        assertEquals(0, topology.stateStores().size());
+        assertEquals(0, topology.storeToChangelogTopic().size());
         assertNull(table1.queryableStoreName());
     }
     
     @Test
     public void shouldBuildGlobalTableWithNonQueryableStoreName() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(Materialized.with(null, null));
-        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+            new MaterializedInternal<>(Materialized.with(null, null), builder, storePrefix);
 
         final GlobalKTable<String, String> table1 = builder.globalTable("topic2", consumed, materializedInternal);
 
@@ -162,8 +152,7 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldBuildGlobalTableWithQueryaIbleStoreName() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(Materialized.as("globalTable"));
-        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+            new MaterializedInternal<>(Materialized.as("globalTable"), builder, storePrefix);
         final GlobalKTable<String, String> table1 = builder.globalTable("topic2", consumed, materializedInternal);
 
         assertEquals("globalTable", table1.queryableStoreName());
@@ -172,8 +161,7 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldBuildSimpleGlobalTableTopology() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(Materialized.as("globalTable"));
-        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+            new MaterializedInternal<>(Materialized.as("globalTable"), builder, storePrefix);
         builder.globalTable("table",
                             consumed,
             materializedInternal);
@@ -204,14 +192,12 @@ public class InternalStreamsBuilderTest {
     public void shouldBuildGlobalTopologyWithAllGlobalTables() {
         {
             final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(Materialized.as("global1"));
-            materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+                new MaterializedInternal<>(Materialized.as("global1"), builder, storePrefix);
             builder.globalTable("table", consumed, materializedInternal);
         }
         {
             final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(Materialized.as("global2"));
-            materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+                new MaterializedInternal<>(Materialized.as("global2"), builder, storePrefix);
             builder.globalTable("table2", consumed, materializedInternal);
         }
 
@@ -225,18 +211,15 @@ public class InternalStreamsBuilderTest {
         final String two = "globalTable2";
 
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(Materialized.as(one));
-        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+            new MaterializedInternal<>(Materialized.as(one), builder, storePrefix);
         final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, materializedInternal);
 
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal2 =
-            new MaterializedInternal<>(Materialized.as(two));
-        materializedInternal2.generateStoreNameIfNeeded(builder, storePrefix);
+            new MaterializedInternal<>(Materialized.as(two), builder, storePrefix);
         final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, materializedInternal2);
 
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternalNotGlobal =
-            new MaterializedInternal<>(Materialized.as("not-global"));
-        materializedInternalNotGlobal.generateStoreNameIfNeeded(builder, storePrefix);
+            new MaterializedInternal<>(Materialized.as("not-global"), builder, storePrefix);
         builder.table("not-global", consumed, materializedInternalNotGlobal);
 
         final KeyValueMapper<String, String, String> kvMapper = (key, value) -> value;
@@ -266,8 +249,7 @@ public class InternalStreamsBuilderTest {
         final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
 
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(Materialized.as("table-store"));
-        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+            new MaterializedInternal<>(Materialized.as("table-store"), builder, storePrefix);
         final KTable<String, String> table = builder.table("table-topic", consumed, materializedInternal);
 
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index e717b08..8474565 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -30,7 +29,6 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessor;
@@ -52,6 +50,8 @@ public class KTableFilterTest {
     private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
 
+    private final Predicate<String, Integer> predicate = (key, value) -> (value % 2) == 0;
+
     private void doTestKTable(final StreamsBuilder builder,
                               final KTable<String, Integer> table2,
                               final KTable<String, Integer> table3,
@@ -83,18 +83,8 @@ public class KTableFilterTest {
 
         final KTable<String, Integer> table1 = builder.table(topic1, consumed);
 
-        final KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
-            @Override
-            public boolean test(final String key, final Integer value) {
-                return (value % 2) == 0;
-            }
-        });
-        final KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() {
-            @Override
-            public boolean test(final String key, final Integer value) {
-                return (value % 2) == 0;
-            }
-        });
+        final KTable<String, Integer> table2 = table1.filter(predicate);
+        final KTable<String, Integer> table3 = table1.filterNot(predicate);
 
         doTestKTable(builder, table2, table3, topic1);
     }
@@ -107,18 +97,8 @@ public class KTableFilterTest {
 
         final KTable<String, Integer> table1 = builder.table(topic1, consumed);
 
-        final KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
-            @Override
-            public boolean test(final String key, final Integer value) {
-                return (value % 2) == 0;
-            }
-        }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter"));
-        final KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() {
-            @Override
-            public boolean test(final String key, final Integer value) {
-                return (value % 2) == 0;
-            }
-        });
+        final KTable<String, Integer> table2 = table1.filter(predicate, Materialized.as("anyStoreNameFilter"));
+        final KTable<String, Integer> table3 = table1.filterNot(predicate);
 
         assertEquals("anyStoreNameFilter", table2.queryableStoreName());
         assertNull(table3.queryableStoreName());
@@ -195,32 +175,6 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testValueGetter() {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final String topic1 = "topic1";
-
-        final KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
-        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
-        final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
-
-        doTestValueGetter(builder, table2, table3, topic1);
-    }
-
-    @Test
     public void testQueryableValueGetter() {
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -228,23 +182,16 @@ public class KTableFilterTest {
 
         final KTableImpl<String, Integer, Integer> table1 =
             (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
-        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(final String key, final Integer value) {
-                    return (value % 2) == 0;
-                }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter"));
-        final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(final String key, final Integer value) {
-                    return (value % 2) == 0;
-                }
-            });
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
+        final KTableImpl<String, Integer, Integer> table3 =
+            (KTableImpl<String, Integer, Integer>) table1.filterNot(predicate, Materialized.as("store3"));
+        final KTableImpl<String, Integer, Integer> table4 =
+            (KTableImpl<String, Integer, Integer>) table1.filterNot(predicate);
 
-        assertEquals("anyStoreNameFilter", table2.queryableStoreName());
-        assertNull(table3.queryableStoreName());
+        assertEquals("store2", table2.queryableStoreName());
+        assertEquals("store3", table3.queryableStoreName());
+        assertNull(table4.queryableStoreName());
 
         doTestValueGetter(builder, table2, table3, topic1);
     }
@@ -297,13 +244,7 @@ public class KTableFilterTest {
 
         final KTableImpl<String, Integer, Integer> table1 =
                 (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
-        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
 
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
@@ -316,13 +257,8 @@ public class KTableFilterTest {
 
         final KTableImpl<String, Integer, Integer> table1 =
             (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
-        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(final String key, final Integer value) {
-                    return (value % 2) == 0;
-                }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter"));
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("anyStoreNameFilter"));
 
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
@@ -377,13 +313,8 @@ public class KTableFilterTest {
 
         final KTableImpl<String, Integer, Integer> table1 =
                 (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
-        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
 
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
@@ -396,13 +327,8 @@ public class KTableFilterTest {
 
         final KTableImpl<String, Integer, Integer> table1 =
             (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
-        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(final String key, final Integer value) {
-                    return (value % 2) == 0;
-                }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter"));
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("anyStoreNameFilter"));
 
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
@@ -440,14 +366,10 @@ public class KTableFilterTest {
         final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
         final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
-            new Predicate<String, String>() {
-                @Override
-                public boolean test(final String key, final String value) {
-                    return value.equalsIgnoreCase("accept");
-                }
-            }).groupBy(MockMapper.<String, String>noOpKeyValueMapper())
-            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER);
+        final KTableImpl<String, String, String> table2 =
+            (KTableImpl<String, String, String>) table1.filter((key, value) -> value.equalsIgnoreCase("accept"))
+                .groupBy(MockMapper.noOpKeyValueMapper())
+                .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER);
 
         doTestSkipNullOnMaterialization(builder, table1, table2, topic1);
     }
@@ -462,26 +384,17 @@ public class KTableFilterTest {
         final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
         final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
-            new Predicate<String, String>() {
-                @Override
-                public boolean test(final String key, final String value) {
-                    return value.equalsIgnoreCase("accept");
-                }
-            }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter")).groupBy(MockMapper.<String, String>noOpKeyValueMapper())
-            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result"));
+        final KTableImpl<String, String, String> table2 =
+            (KTableImpl<String, String, String>) table1.filter((key, value) -> value.equalsIgnoreCase("accept"), Materialized.as("anyStoreNameFilter"))
+                .groupBy(MockMapper.noOpKeyValueMapper())
+                .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as("mock-result"));
 
         doTestSkipNullOnMaterialization(builder, table1, table2, topic1);
     }
 
     @Test
     public void testTypeVariance() {
-        final Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>() {
-            @Override
-            public boolean test(final Number key, final Object value) {
-                return false;
-            }
-        };
+        final Predicate<Number, Object> numberKeyPredicate = (key, value) -> false;
 
         new StreamsBuilder()
             .<Integer, String>table("empty")
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index d0ed50b..dd39291 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyTestDriverWrapper;
-import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
@@ -40,7 +39,6 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -204,138 +202,6 @@ public class KTableImplTest {
     }
 
     @Test
-    public void testValueGetter() {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final String topic1 = "topic1";
-        final String topic2 = "topic2";
-
-        final KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                new ValueMapper<String, Integer>() {
-                    @Override
-                    public Integer apply(final String value) {
-                        return new Integer(value);
-                    }
-                });
-        final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
-
-        table1.toStream().to(topic2, produced);
-        final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
-
-        final Topology topology = builder.build();
-
-        final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
-        final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
-        final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-        final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
-
-        final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
-        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
-        topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
-        topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
-        topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames());
-
-        try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {
-
-            assertEquals(2, driver.getAllStateStores().size());
-
-            final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-            final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-            final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-            final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
-
-            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
-            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
-            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
-            getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
-
-            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
-            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
-            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
-
-            assertEquals("01", getter1.get("A"));
-            assertEquals("01", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(1), getter2.get("A"));
-            assertEquals(new Integer(1), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("01", getter4.get("A"));
-            assertEquals("01", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
-            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
-
-            assertEquals("02", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(2), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertEquals(new Integer(2), getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("02", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
-
-            assertEquals("03", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(3), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("03", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
-
-            assertNull(getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-
-            assertNull(getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertNull(getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-        }
-    }
-
-    @Test
     public void testStateStoreLazyEval() {
         final String topic1 = "topic1";
         final String topic2 = "topic2";
@@ -362,7 +228,7 @@ public class KTableImplTest {
                 });
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
-            assertEquals(2, driver.getAllStateStores().size());
+            assertEquals(0, driver.getAllStateStores().size());
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 5ecbe83..63ed53f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -49,7 +49,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 public class KTableKTableInnerJoinTest {
 
@@ -72,105 +71,44 @@ public class KTableKTableInnerJoinTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    private void doTestJoin(final StreamsBuilder builder,
-                            final int[] expectedKeys,
-                            final MockProcessorSupplier<Integer, String> supplier,
-                            final KTable<Integer, String> joined) {
-        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
-
-        assertEquals(1, copartitionGroups.size());
-        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
-
-        @SuppressWarnings("unchecked")
-        final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
-
-        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
-        driver.setTime(0L);
-
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
-
-        final KTableValueGetter<Integer, String> getter = getterSupplier.get();
-        getter.init(driver.context());
-
-        // push two items to the primary stream. the other table is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        // pass tuple with null key, it will be discarded in join process
-        driver.process(topic1, null, "SomeVal");
-        driver.flushState();
-
-        processor.checkAndClearProcessResult();
-
-        // push two items to the other stream. this should produce two items.
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        // pass tuple with null key, it will be discarded in join process
-        driver.process(topic2, null, "AnotherVal");
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"));
-
-        // push all four items to the primary stream. this should produce two items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
-        checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1"));
-
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
-        checkJoinedValues(getter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
-
-        // push all four items to the primary stream. this should produce four items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
-
-        // push two items with null to the other stream as deletes. this should produce two item.
+    @Test
+    public void testJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        processor.checkAndClearProcessResult("0:null", "1:null");
-        checkJoinedValues(getter, kv(0, null), kv(1, null));
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
+        joined.toStream().process(supplier);
 
-        // push all four items to the primary stream. this should produce two items.
+        doTestJoin(builder, expectedKeys, supplier, joined);
+    }
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
+    @Test
+    public void testQueryableJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
-        checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        driver.process(topic1, null, "XX" + 1);
-        checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> table3;
+        final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
+        table3 = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, materialized);
+        table3.toStream().process(processor);
 
+        doTestJoin(builder, expectedKeys, processor, table3);
     }
 
     @Test
-    public void testJoin() {
+    public void testQueryableNotSendingOldValues() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -179,57 +117,72 @@ public class KTableKTableInnerJoinTest {
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
-        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
-        joined.toStream().process(supplier);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, materialized);
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
-        doTestJoin(builder, expectedKeys, supplier, joined);
+        doTestNotSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined);
     }
 
     @Test
-    public void testQueryableJoin() {
+    public void testNotSendingOldValues() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
-        final KTable<Integer, String> table3;
-        final MockProcessorSupplier<Integer, String> processor;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
 
-        processor = new MockProcessorSupplier<>();
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
-        table3 = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, materialized);
-        table3.toStream().process(processor);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
-        doTestJoin(builder, expectedKeys, processor, table3);
+        doTestNotSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined);
     }
 
-    private void doTestSendingOldValues(final StreamsBuilder builder,
-                                        final int[] expectedKeys,
-                                        final KTable<Integer, String> table1,
-                                        final KTable<Integer, String> table2,
-                                        final MockProcessorSupplier<Integer, String> supplier,
-                                        final KTable<Integer, String> joined,
-                                        final boolean sendOldValues) {
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Processor<String, Change<String>> join = new KTableKTableInnerJoin<>(
+            (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)),
+            (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)),
+            null
+        ).get();
+
+        final MockProcessorContext context = new MockProcessorContext();
+        context.setRecordMetadata("left", -1, -2, null, -3);
+        join.init(context);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        join.process(null, new Change<>("new", "old"));
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+    }
+
+    private void doTestNotSendingOldValues(final StreamsBuilder builder,
+                                           final int[] expectedKeys,
+                                           final KTable<Integer, String> table1,
+                                           final KTable<Integer, String> table2,
+                                           final MockProcessorSupplier<Integer, String> supplier,
+                                           final KTable<Integer, String> joined) {
+
+        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
         driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
         final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
 
-        if (!sendOldValues) {
-            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-        } else {
-            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
-            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-        }
         // push two items to the primary stream. the other table is empty
 
         for (int i = 0; i < 2; i++) {
@@ -289,91 +242,104 @@ public class KTableKTableInnerJoinTest {
         proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
-    @Test
-    public void testNotSendingOldValues() {
-        final StreamsBuilder builder = new StreamsBuilder();
+    @SuppressWarnings("unchecked")
+    private void doTestJoin(final StreamsBuilder builder,
+                            final int[] expectedKeys,
+                            final MockProcessorSupplier<Integer, String> supplier,
+                            final KTable<Integer, String> joined) {
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        final KTable<Integer, String> table1;
-        final KTable<Integer, String> table2;
-        final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier;
+        final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-        table1 = builder.table(topic1, consumed);
-        table2 = builder.table(topic2, consumed);
-        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
-        supplier = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
+        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
+        driver.setTime(0L);
 
-        doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, false);
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-    }
+        final KTableValueGetter<Integer, String> getter = getterSupplier.get();
+        getter.init(driver.context());
 
-    @Test
-    public void testQueryableNotSendingOldValues() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        // push two items to the primary stream. the other table is empty
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+        // pass tuple with null key, it will be discarded in join process
+        driver.process(topic1, null, "SomeVal");
+        driver.flushState();
 
-        final KTable<Integer, String> table1;
-        final KTable<Integer, String> table2;
-        final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier;
+        processor.checkAndClearProcessResult();
 
-        table1 = builder.table(topic1, consumed);
-        table2 = builder.table(topic2, consumed);
-        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, materialized);
-        supplier = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
+        // push two items to the other stream. this should produce two items.
 
-        doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, false);
-    }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
+        // pass tuple with null key, it will be discarded in join process
+        driver.process(topic2, null, "AnotherVal");
+        driver.flushState();
 
-    @Test
-    public void testSendingOldValues() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"));
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        // push all four items to the primary stream. this should produce two items.
 
-        final KTable<Integer, String> table1;
-        final KTable<Integer, String> table2;
-        final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier;
+        for (final int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        }
+        driver.flushState();
 
-        table1 = builder.table(topic1, consumed);
-        table2 = builder.table(topic2, consumed);
-        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
+        checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1"));
 
-        supplier = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
+        // push all items to the other stream. this should produce four items.
+        for (final int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        }
+        driver.flushState();
 
-        doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, true);
+        processor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
-    }
+        // push all four items to the primary stream. this should produce four items.
 
-    @Test
-    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        for (final int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
+        }
+        driver.flushState();
 
-        @SuppressWarnings("unchecked")
-        final Processor<String, Change<String>> join = new KTableKTableInnerJoin<>(
-            (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)),
-            (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)),
-            null
-        ).get();
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-        final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, null, -3);
-        join.init(context);
-        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        join.process(null, new Change<>("new", "old"));
-        LogCaptureAppender.unregister(appender);
+        // push two items with null to the other stream as deletes. this should produce two item.
+
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
+        driver.flushState();
+
+        processor.checkAndClearProcessResult("0:null", "1:null");
+        checkJoinedValues(getter, kv(0, null), kv(1, null));
+
+        // push all four items to the primary stream. this should produce two items.
+
+        for (final int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        }
+        driver.flushState();
+
+        processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+
+        driver.process(topic1, null, "XX" + 1);
+        checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
-        assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
     }
 
+
     private KeyValue<Integer, String> kv(final Integer key, final String value) {
         return new KeyValue<>(key, value);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 21ef038..00791aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -27,9 +27,6 @@ import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -50,7 +47,6 @@ import static org.junit.Assert.assertTrue;
 public class KTableMapValuesTest {
 
     private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
-    private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
@@ -71,12 +67,7 @@ public class KTableMapValuesTest {
         final String topic1 = "topic1";
 
         final KTable<String, String> table1 = builder.table(topic1, consumed);
-        final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
-            @Override
-            public Integer apply(final CharSequence value) {
-                return value.charAt(0) - 48;
-            }
-        });
+        final KTable<String, Integer> table2 = table1.mapValues(value -> value.charAt(0) - 48);
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
@@ -91,12 +82,7 @@ public class KTableMapValuesTest {
         final String topic1 = "topic1";
 
         final KTable<String, String> table1 = builder.table(topic1, consumed);
-        final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
-            @Override
-            public Integer apply(final CharSequence value) {
-                return value.charAt(0) - 48;
-            }
-        }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
+        final KTable<String, Integer> table2 = table1.mapValues(value -> value.charAt(0) - 48, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
@@ -106,170 +92,94 @@ public class KTableMapValuesTest {
 
     private void doTestValueGetter(final StreamsBuilder builder,
                                    final String topic1,
-                                   final KTableImpl<String, String, String> table1,
                                    final KTableImpl<String, String, Integer> table2,
-                                   final KTableImpl<String, Integer, Integer> table3,
-                                   final KTableImpl<String, String, String> table4) {
+                                   final KTableImpl<String, String, Integer> table3) {
 
         final Topology topology = builder.build();
 
-        final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-        final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
         final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
-        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
         topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
         topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
-        topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames());
 
         try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), props)) {
-            final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
             final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
             final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-            final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
 
-            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
             getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
             getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
-            getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "01"));
             driver.pipeInput(recordFactory.create(topic1, "B", "01"));
             driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-            assertEquals("01", getter1.get("A"));
-            assertEquals("01", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
             assertEquals(new Integer(1), getter2.get("A"));
             assertEquals(new Integer(1), getter2.get("B"));
             assertEquals(new Integer(1), getter2.get("C"));
 
-            assertNull(getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("01", getter4.get("A"));
-            assertEquals("01", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
+            assertEquals(new Integer(-1), getter3.get("A"));
+            assertEquals(new Integer(-1), getter3.get("B"));
+            assertEquals(new Integer(-1), getter3.get("C"));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "02"));
             driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-            assertEquals("02", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
             assertEquals(new Integer(2), getter2.get("A"));
             assertEquals(new Integer(2), getter2.get("B"));
             assertEquals(new Integer(1), getter2.get("C"));
 
-            assertEquals(new Integer(2), getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("02", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
+            assertEquals(new Integer(-2), getter3.get("A"));
+            assertEquals(new Integer(-2), getter3.get("B"));
+            assertEquals(new Integer(-1), getter3.get("C"));
 
             driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-            assertEquals("03", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
             assertEquals(new Integer(3), getter2.get("A"));
             assertEquals(new Integer(2), getter2.get("B"));
             assertEquals(new Integer(1), getter2.get("C"));
 
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("03", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
+            assertEquals(new Integer(-3), getter3.get("A"));
+            assertEquals(new Integer(-2), getter3.get("B"));
+            assertEquals(new Integer(-1), getter3.get("C"));
 
             driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
 
-            assertNull(getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
             assertNull(getter2.get("A"));
             assertEquals(new Integer(2), getter2.get("B"));
             assertEquals(new Integer(1), getter2.get("C"));
 
             assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertNull(getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
+            assertEquals(new Integer(-2), getter3.get("B"));
+            assertEquals(new Integer(-1), getter3.get("C"));
         }
     }
 
     @Test
-    public void testValueGetter() {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final String topic1 = "topic1";
-        final String topic2 = "topic2";
-
-        final KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                new ValueMapper<String, Integer>() {
-                    @Override
-                    public Integer apply(final String value) {
-                        return new Integer(value);
-                    }
-                });
-        final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
-        table1.toStream().to(topic2, produced);
-        final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
-
-        doTestValueGetter(builder, topic1, table1, table2, table3, table4);
-    }
-
-    @Test
     public void testQueryableValueGetter() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String topic1 = "topic1";
-        final String topic2 = "topic2";
-        final String storeName2 = "anyMapName";
-        final String storeName3 = "anyFilterName";
+        final String storeName2 = "store2";
+        final String storeName3 = "store3";
 
         final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-            new ValueMapper<String, Integer>() {
-                @Override
-                public Integer apply(final String value) {
-                    return new Integer(value);
-                }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName2).withValueSerde(Serdes.Integer()));
-        final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(final String key, final Integer value) {
-                    return (value % 2) == 0;
-                }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3).withValueSerde(Serdes.Integer()));
-        table1.toStream().to(topic2, produced);
-        final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
-
-        doTestValueGetter(builder, topic1, table1, table2, table3, table4);
+        final KTableImpl<String, String, Integer> table2 =
+            (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new,
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName2).withValueSerde(Serdes.Integer()));
+        final KTableImpl<String, String, Integer> table3 =
+            (KTableImpl<String, String, Integer>) table1.mapValues(value -> new Integer(value) * (-1),
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3).withValueSerde(Serdes.Integer()));
+        final KTableImpl<String, String, Integer> table4 =
+            (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
+
+        assertEquals(storeName2, table2.queryableStoreName());
+        assertEquals(storeName3, table3.queryableStoreName());
+        assertNull(table4.queryableStoreName());
+
+        doTestValueGetter(builder, topic1, table2, table3);
     }
 
     @Test
@@ -280,13 +190,7 @@ public class KTableMapValuesTest {
 
         final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                new ValueMapper<String, Integer>() {
-                    @Override
-                    public Integer apply(final String value) {
-                        return new Integer(value);
-                    }
-                });
+        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
@@ -328,13 +232,7 @@ public class KTableMapValuesTest {
 
         final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                new ValueMapper<String, Integer>() {
-                    @Override
-                    public Integer apply(final String value) {
-                        return new Integer(value);
-                    }
-                });
+        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
 
         table2.enableSendingOldValues();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index c9db537..0d42e44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -97,7 +98,7 @@ public class KTableSourceTest {
         final String topic1 = "topic1";
 
         @SuppressWarnings("unchecked")
-        final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
+        final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed, Materialized.as("store"));
 
         final Topology topology = builder.build();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
index 1a83ef1..5d5e888 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -50,8 +50,7 @@ public class MaterializedInternalTest {
         EasyMock.replay(nameProvider);
 
         final MaterializedInternal<Object, Object, StateStore> materialized =
-            new MaterializedInternal<>(Materialized.with(null, null));
-        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
+            new MaterializedInternal<>(Materialized.with(null, null), nameProvider, prefix);
 
         assertThat(materialized.storeName(), equalTo(generatedName));
         EasyMock.verify(nameProvider);
@@ -61,8 +60,7 @@ public class MaterializedInternalTest {
     public void shouldUseProvidedStoreNameWhenSet() {
         final String storeName = "store-name";
         final MaterializedInternal<Object, Object, StateStore> materialized =
-            new MaterializedInternal<>(Materialized.as(storeName));
-        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
+            new MaterializedInternal<>(Materialized.as(storeName), nameProvider, prefix);
         assertThat(materialized.storeName(), equalTo(storeName));
     }
 
@@ -72,8 +70,7 @@ public class MaterializedInternalTest {
         EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes();
         EasyMock.replay(supplier);
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.as(supplier));
-        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
+            new MaterializedInternal<>(Materialized.as(supplier), nameProvider, prefix);
         assertThat(materialized.storeName(), equalTo(storeName));
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 387179b..4e6023f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -72,32 +71,31 @@ public class GlobalStreamThreadTest {
     @SuppressWarnings("unchecked")
     @Before
     public void before() {
-        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
-            Materialized.with(null, null));
-        materialized.generateStoreNameIfNeeded(
-            new InternalNameProvider() {
-                @Override
-                public String newProcessorName(final String prefix) {
-                    return "processorName";
-                }
+        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.with(null, null),
+                new InternalNameProvider() {
+                    @Override
+                    public String newProcessorName(final String prefix) {
+                        return "processorName";
+                    }
 
-                @Override
-                public String newStoreName(final String prefix) {
-                    return GLOBAL_STORE_NAME;
-                }
-            },
-            "store-"
-        );
+                    @Override
+                    public String newStoreName(final String prefix) {
+                        return GLOBAL_STORE_NAME;
+                    }
+                },
+                "store-"
+            );
 
         builder.addGlobalStore(
-            (StoreBuilder) new KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
+            new KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
             "sourceName",
             null,
             null,
             null,
             GLOBAL_STORE_TOPIC_NAME,
             "processorName",
-            new KTableSource<>(GLOBAL_STORE_NAME));
+            new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME));
 
         final HashMap<String, Object> properties = new HashMap<>();
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 3cb6d42..494ae02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -54,8 +54,7 @@ public class KeyValueStoreMaterializerTest {
     @Test
     public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.as("store"));
-        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
+            new MaterializedInternal<>(Materialized.as("store"), nameProvider, storePrefix);
 
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
@@ -70,9 +69,8 @@ public class KeyValueStoreMaterializerTest {
     @Test
     public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
-            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled()
+            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled(), nameProvider, storePrefix
         );
-        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -83,10 +81,8 @@ public class KeyValueStoreMaterializerTest {
     @Test
     public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
-            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
-                                                     .withLoggingDisabled()
+            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withLoggingDisabled(), nameProvider, storePrefix
         );
-        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -98,10 +94,8 @@ public class KeyValueStoreMaterializerTest {
     @Test
     public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
-            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
-                                                     .withCachingDisabled()
-                                                     .withLoggingDisabled());
-        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
+            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, storePrefix
+        );
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -119,14 +113,13 @@ public class KeyValueStoreMaterializerTest {
         EasyMock.replay(supplier);
 
         final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.as(supplier));
-        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
+            new MaterializedInternal<>(Materialized.as(supplier), nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize();
         final KeyValueStore<String, Integer> built = builder.build();
         final StateStore inner = ((WrappedStateStore) built).inner();
 
-        assertThat(inner, CoreMatchers.<StateStore>equalTo(store));
+        assertThat(inner, CoreMatchers.equalTo(store));
     }
 
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 42e22f5..5f30489 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -972,8 +972,7 @@ public class StreamThreadTest {
         final TopicPartition partition2 = new TopicPartition(changelogName2, 1);
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
-        final MaterializedInternal materialized = new MaterializedInternal(Materialized.as(storeName2));
-        materialized.generateStoreNameIfNeeded(internalStreamsBuilder, "");
+        final MaterializedInternal materialized = new MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, "");
         internalStreamsBuilder.table(topic2, new ConsumedInternal(), materialized);
 
         internalStreamsBuilder.buildAndOptimizeTopology();


Mime
View raw message