kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6813: return to double-counting for count topology names (#5075)
Date Mon, 04 Jun 2018 22:34:16 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f9f365  KAFKA-6813: return to double-counting for count topology names (#5075)
6f9f365 is described below

commit 6f9f3655731ea1d46bd1f0ed0957579d831e2692
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Mon Jun 4 17:33:53 2018 -0500

    KAFKA-6813: return to double-counting for count topology names (#5075)
    
    #4919 unintentionally changed the topology naming scheme. This change returns to the prior scheme.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |  44 ++-
 .../kstream/internals/KGroupedStreamImpl.java      |  35 +-
 .../kstream/internals/KGroupedTableImpl.java       |  15 +-
 .../streams/kstream/internals/KTableImpl.java      |  44 ++-
 .../kstream/internals/MaterializedInternal.java    |  19 +-
 .../internals/SessionWindowedKStreamImpl.java      |  36 ++-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  37 ++-
 .../org/apache/kafka/streams/TopologyTest.java     | 351 ++++++++++++++++++---
 .../internals/InternalStreamsBuilderTest.java      | 108 +++----
 .../internals/MaterializedInternalTest.java        |  15 +-
 .../internals/GlobalStreamThreadTest.java          |  10 +-
 .../internals/KeyValueStoreMaterializerTest.java   |  35 +-
 .../processor/internals/StreamThreadTest.java      |   4 +-
 13 files changed, 544 insertions(+), 209 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index ead8a76..517104d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -224,9 +224,9 @@ public class StreamsBuilder {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
-        return internalStreamsBuilder.table(topic,
-                                            consumedInternal,
-                                            new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+        return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
     }
 
     /**
@@ -273,12 +273,10 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
-        return internalStreamsBuilder.table(topic,
-                                            consumedInternal,
-                                            new MaterializedInternal<>(
-                                                    Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
-                                                    internalStreamsBuilder,
-                                                    topic + "-"));
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()));
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+        return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
     }
 
     /**
@@ -302,8 +300,9 @@ public class StreamsBuilder {
                                                   final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+
         return internalStreamsBuilder.table(topic,
                                             new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
                                                                                  materializedInternal.valueSerde())),
@@ -331,14 +330,11 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized =
-                new MaterializedInternal<>(
-                        Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
-                        internalStreamsBuilder,
-                        topic + "-");
-
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()));
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
 
-        return internalStreamsBuilder.globalTable(topic, consumedInternal, materialized);
+        return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
     }
 
     /**
@@ -402,9 +398,10 @@ public class StreamsBuilder {
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         // always use the serdes from consumed
         materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
-        return internalStreamsBuilder.globalTable(topic,
-                                                  consumedInternal,
-                                                  new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+
+        return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
     }
 
     /**
@@ -436,8 +433,9 @@ public class StreamsBuilder {
                                                               final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+
         return internalStreamsBuilder.globalTable(topic,
                                                   new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
                                                                                        materializedInternal.valueSerde())),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 3a9f919..74f930e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -74,8 +74,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -97,8 +99,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -117,14 +120,26 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     @Override
     public KTable<K, Long> count() {
-        return count(Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>with(keySerde, Serdes.Long()));
+        return doCount(Materialized.with(keySerde, Serdes.Long()));
     }
 
     @Override
     public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(materialized);
+    }
+
+    private KTable<K, Long> doCount(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -133,9 +148,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         }
 
         return doAggregate(
-                new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
-                AGGREGATE_NAME,
-                materializedInternal);
+            new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            AGGREGATE_NAME,
+            materializedInternal);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index db119f3..49f258b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -128,8 +128,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -150,8 +151,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     @Override
     public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
-        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -182,8 +184,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index bcd31bb..21c1505 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -154,7 +154,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), false);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, FILTER_NAME);
+
+        return doFilter(predicate, materializedInternal, false);
     }
 
     @Override
@@ -168,7 +171,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                   final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), true);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, FILTER_NAME);
+
+        return doFilter(predicate, materializedInternal, true);
     }
 
     private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
@@ -210,7 +216,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        return doMapValues(withKey(mapper), new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME));
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, MAPVALUES_NAME);
+
+        return doMapValues(withKey(mapper), materializedInternal);
     }
 
     @Override
@@ -219,7 +228,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        return doMapValues(mapper, new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME));
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, MAPVALUES_NAME);
+
+        return doMapValues(mapper, materializedInternal);
     }
 
     @Override
@@ -233,8 +245,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                               final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                               final String... stateStoreNames) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doTransformValues(transformerSupplier,
-            new MaterializedInternal<>(materialized, builder, TRANSFORMVALUES_NAME), stateStoreNames);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, TRANSFORMVALUES_NAME);
+
+        return doTransformValues(transformerSupplier, materializedInternal, stateStoreNames);
     }
 
     private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
@@ -304,7 +318,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), false, false);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+
+        return doJoin(other, joiner, materializedInternal, false, false);
     }
 
     @Override
@@ -317,7 +334,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), true, true);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+
+        return doJoin(other, joiner, materializedInternal, true, true);
     }
 
     @Override
@@ -330,11 +350,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoin(other,
-                      joiner,
-                      new MaterializedInternal<>(materialized, builder, MERGE_NAME),
-                      true,
-                      false);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+        return doJoin(other, joiner, materializedInternal, true, false);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index c933b86..5361e48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -25,18 +25,17 @@ import java.util.Map;
 
 public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
 
-    private final boolean queryable;
+    private final boolean queriable;
 
-
-    public MaterializedInternal(final Materialized<K, V, S> materialized,
-                                final InternalNameProvider nameProvider,
-                                final String generatedStorePrefix) {
+    public MaterializedInternal(final Materialized<K, V, S> materialized) {
         super(materialized);
+        queriable = storeName() != null;
+    }
+
+    public void generateStoreNameIfNeeded(final InternalNameProvider nameProvider,
+                                          final String generatedStorePrefix) {
         if (storeName() == null) {
-            queryable = false;
             storeName = nameProvider.newStoreName(generatedStorePrefix);
-        } else {
-            queryable = true;
         }
     }
 
@@ -63,7 +62,7 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
         return loggingEnabled;
     }
 
-    public Map<String, String> logConfig() {
+    Map<String, String> logConfig() {
         return topicConfig;
     }
 
@@ -72,6 +71,6 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
     }
 
     boolean isQueryable() {
-        return queryable;
+        return queriable;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index c29c656..b3cbacd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -68,15 +68,26 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
 
     @Override
     public KTable<Windowed<K>, Long> count() {
-        return count(Materialized.<K, Long, SessionStore<Bytes, byte[]>>with(keySerde, Serdes.Long()));
+        return doCount(Materialized.with(keySerde, Serdes.Long()));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(materialized);
+    }
+
+    @SuppressWarnings("unchecked")
+    private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -85,10 +96,10 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         }
 
         return (KTable<Windowed<K>, Long>) aggregateBuilder.build(
-                new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, countMerger),
-                AGGREGATE_NAME,
-                materialize(materializedInternal),
-                materializedInternal.isQueryable());
+            new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, countMerger),
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
+            materializedInternal.isQueryable());
     }
 
     @Override
@@ -103,8 +114,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final Aggregator<K, V, V> reduceAggregator = aggregatorForReducer(reducer);
-        final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+        final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -136,8 +147,9 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index d1e5a17..4f5301b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -24,9 +24,9 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -63,15 +63,27 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
 
     @Override
     public KTable<Windowed<K>, Long> count() {
-        return count(Materialized.<K, Long, WindowStore<Bytes, byte[]>>with(keySerde, Serdes.Long()));
+        return doCount(Materialized.with(keySerde, Serdes.Long()));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(materialized);
+    }
+
+    @SuppressWarnings("unchecked")
+    private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -80,9 +92,9 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         }
 
         return (KTable<Windowed<K>, Long>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
-                AGGREGATE_NAME,
-                materialize(materializedInternal),
-                materializedInternal.isQueryable());
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
+            materializedInternal.isQueryable());
     }
 
 
@@ -100,8 +112,8 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -122,8 +134,9 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index a845be3..f8f9c7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -40,6 +44,7 @@ import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class TopologyTest {
@@ -76,12 +81,7 @@ public class TopologyTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        topology.addProcessor(null, new ProcessorSupplier() {
-            @Override
-            public Processor get() {
-                return new MockProcessorSupplier().get();
-            }
-        });
+        topology.addProcessor(null, () -> new MockProcessorSupplier().get());
     }
 
     @Test(expected = NullPointerException.class)
@@ -130,7 +130,7 @@ public class TopologyTest {
         try {
             topology.addSource("source", "topic-2");
             fail("Should throw TopologyException for duplicate source name");
-        } catch (TopologyException expected) { }
+        } catch (final TopologyException expected) { }
     }
 
     @Test
@@ -139,7 +139,7 @@ public class TopologyTest {
         try {
             topology.addSource("source-2", "topic-1");
             fail("Should throw TopologyException for already used topic");
-        } catch (TopologyException expected) { }
+        } catch (final TopologyException expected) { }
     }
 
     @Test
@@ -167,7 +167,7 @@ public class TopologyTest {
         try {
             topology.addProcessor("processor", new MockProcessorSupplier(), "source");
             fail("Should throw TopologyException for duplicate processor name");
-        } catch (TopologyException expected) { }
+        } catch (final TopologyException expected) { }
     }
 
     @Test(expected = TopologyException.class)
@@ -187,7 +187,7 @@ public class TopologyTest {
         try {
             topology.addSink("sink", "topic-3", "source");
             fail("Should throw TopologyException for duplicate sink name");
-        } catch (TopologyException expected) { }
+        } catch (final TopologyException expected) { }
     }
 
     @Test(expected = TopologyException.class)
@@ -257,7 +257,7 @@ public class TopologyTest {
     }
 
     @Test
-    public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
+    public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -283,7 +283,7 @@ public class TopologyTest {
         } catch (final StreamsException e) {
             final String error = e.toString();
             final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
-            
+
             assertThat(error, equalTo(expectedMessage));
         }
     }
@@ -295,12 +295,12 @@ public class TopologyTest {
         public Processor get() {
             return new Processor() {
                 @Override
-                public void init(ProcessorContext context) {
+                public void init(final ProcessorContext context) {
                     context.getStateStore(STORE_NAME);
                 }
 
                 @Override
-                public void process(Object key, Object value) { }
+                public void process(final Object key, final Object value) { }
 
                 @Override
                 public void close() { }
@@ -324,7 +324,7 @@ public class TopologyTest {
 
     @Test
     public void shouldDescribeEmptyTopology() {
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -333,9 +333,9 @@ public class TopologyTest {
 
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+                Collections.singleton(expectedSourceNode)));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -344,9 +344,9 @@ public class TopologyTest {
 
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+                Collections.singleton(expectedSourceNode)));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -355,9 +355,9 @@ public class TopologyTest {
 
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+                Collections.singleton(expectedSourceNode)));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -365,19 +365,19 @@ public class TopologyTest {
         final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
+                Collections.singleton(expectedSourceNode1)));
 
         final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(1,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
+                Collections.singleton(expectedSourceNode2)));
 
         final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(2,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
+                Collections.singleton(expectedSourceNode3)));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -390,7 +390,7 @@ public class TopologyTest {
         allNodes.add(expectedProcessorNode);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -405,7 +405,7 @@ public class TopologyTest {
         allNodes.add(expectedProcessorNode);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
 
@@ -421,7 +421,7 @@ public class TopologyTest {
         allNodes.add(expectedProcessorNode);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -436,7 +436,7 @@ public class TopologyTest {
         allNodes.add(expectedProcessorNode2);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -451,7 +451,7 @@ public class TopologyTest {
         allNodes.add(expectedProcessorNode);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -480,7 +480,7 @@ public class TopologyTest {
         allNodes3.add(expectedProcessorNode3);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -509,7 +509,7 @@ public class TopologyTest {
         allNodes3.add(expectedSinkNode3);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -540,7 +540,7 @@ public class TopologyTest {
         allNodes.add(expectedSinkNode);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -570,30 +570,303 @@ public class TopologyTest {
         allNodes.add(expectedProcessorNode3);
         expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
     public void shouldDescribeGlobalStoreTopology() {
         addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor", 0);
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
     public void shouldDescribeMultipleGlobalStoreTopology() {
         addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1", 0);
         addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2", 1);
-        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .count();
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000002\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void kGroupedStreamNamedMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .count(Materialized.as("count-store"));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000001\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void kGroupedStreamAnonymousMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .count(Materialized.with(null, Serdes.Long()));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000003\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void timeWindowZeroArgCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(TimeWindows.of(1))
+            .count();
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000002\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void timeWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(TimeWindows.of(1))
+            .count(Materialized.as("count-store"));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000001\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void timeWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(TimeWindows.of(1))
+            .count(Materialized.with(null, Serdes.Long()));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000003\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void sessionWindowZeroArgCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(SessionWindows.with(1))
+            .count();
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000002\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void sessionWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(SessionWindows.with(1))
+            .count(Materialized.as("count-store"));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000001\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void sessionWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(SessionWindows.with(1))
+            .count(Materialized.with(null, Serdes.Long()));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000003\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void tableZeroArgCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table("input-topic")
+            .groupBy((key, value) -> null)
+            .count();
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "      --> KTABLE-SELECT-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-SELECT-0000000003 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000005\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n" +
+                "      <-- KTABLE-SELECT-0000000003\n" +
+                "\n" +
+                "  Sub-topology: 1\n" +
+                "    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n" +
+                "      --> KTABLE-AGGREGATE-0000000007\n" +
+                "    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000006\n" +
+                "\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void tableNamedMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table("input-topic")
+            .groupBy((key, value) -> null)
+            .count(Materialized.as("count-store"));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "      --> KTABLE-SELECT-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-SELECT-0000000003 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000004\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "    Sink: KSTREAM-SINK-0000000004 (topic: count-store-repartition)\n" +
+                "      <-- KTABLE-SELECT-0000000003\n" +
+                "\n" +
+                "  Sub-topology: 1\n" +
+                "    Source: KSTREAM-SOURCE-0000000005 (topics: [count-store-repartition])\n" +
+                "      --> KTABLE-AGGREGATE-0000000006\n" +
+                "    Processor: KTABLE-AGGREGATE-0000000006 (stores: [count-store])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000005\n" +
+                "\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void tableAnonymousMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table("input-topic")
+            .groupBy((key, value) -> null)
+            .count(Materialized.with(null, Serdes.Long()));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
+                "      --> KTABLE-SELECT-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-SELECT-0000000003 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000005\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n" +
+                "      <-- KTABLE-SELECT-0000000003\n" +
+                "\n" +
+                "  Sub-topology: 1\n" +
+                "    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n" +
+                "      --> KTABLE-AGGREGATE-0000000007\n" +
+                "    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000006\n" +
+                "\n",
+            describe.toString()
+        );
     }
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final String... sourceTopic) {
         topology.addSource(null, sourceName, null, null, null, sourceTopic);
-        String allSourceTopics = sourceTopic[0];
+        final StringBuilder allSourceTopics = new StringBuilder(sourceTopic[0]);
         for (int i = 1; i < sourceTopic.length; ++i) {
-            allSourceTopics += ", " + sourceTopic[i];
+            allSourceTopics.append(", ").append(sourceTopic[i]);
         }
-        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
+        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics.toString());
     }
 
     private TopologyDescription.Source addSource(final String sourceName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 79bf81e..63432ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -58,12 +57,11 @@ public class InternalStreamsBuilderTest {
     private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
     private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
     private final String storePrefix = "prefix-";
-    private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-            = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), builder, storePrefix);
+    private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"));
 
-    @Before
-    public void setUp() {
+    {
         builder.internalTopologyBuilder.setApplicationId(APP_ID);
+        materialized.generateStoreNameIfNeeded(builder, storePrefix);
     }
 
     @Test
@@ -127,12 +125,10 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() {
-        KTable table1 = builder.table("topic2",
-                                      consumed,
-                                      new MaterializedInternal<>(
-                                              Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null),
-                                              builder,
-                                              storePrefix));
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.with(null, null));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+        final KTable table1 = builder.table("topic2", consumed, materializedInternal);
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
 
@@ -147,38 +143,33 @@ public class InternalStreamsBuilderTest {
     
     @Test
     public void shouldBuildGlobalTableWithNonQueryableStoreName() {
-        final GlobalKTable<String, String> table1 = builder.globalTable(
-            "topic2",
-            consumed,
-            new MaterializedInternal<>(
-                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null),
-                builder,
-                storePrefix));
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.with(null, null));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+
+        final GlobalKTable<String, String> table1 = builder.globalTable("topic2", consumed, materializedInternal);
 
         assertNull(table1.queryableStoreName());
     }
 
     @Test
     public void shouldBuildGlobalTableWithQueryaIbleStoreName() {
-        final GlobalKTable<String, String> table1 = builder.globalTable(
-            "topic2",
-            consumed,
-            new MaterializedInternal<>(
-                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
-                builder,
-                storePrefix));
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as("globalTable"));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+        final GlobalKTable<String, String> table1 = builder.globalTable("topic2", consumed, materializedInternal);
 
         assertEquals("globalTable", table1.queryableStoreName());
     }
 
     @Test
     public void shouldBuildSimpleGlobalTableTopology() {
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as("globalTable"));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
         builder.globalTable("table",
                             consumed,
-                            new MaterializedInternal<>(
-                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
-                                    builder,
-                                    storePrefix));
+            materializedInternal);
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -199,15 +190,18 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldBuildGlobalTopologyWithAllGlobalTables() {
-        builder.globalTable("table",
-                            consumed,
-                            new MaterializedInternal<>(
-                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1"), builder, storePrefix));
-        builder.globalTable("table2",
-                            consumed,
-                            new MaterializedInternal<>(
-                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2"), builder, storePrefix));
-
+        {
+            final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(Materialized.as("global1"));
+            materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+            builder.globalTable("table", consumed, materializedInternal);
+        }
+        {
+            final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(Materialized.as("global2"));
+            materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+            builder.globalTable("table2", consumed, materializedInternal);
+        }
         doBuildGlobalTopologyWithAllGlobalTables();
     }
 
@@ -216,25 +210,22 @@ public class InternalStreamsBuilderTest {
         final String one = "globalTable";
         final String two = "globalTable2";
 
-        final GlobalKTable<String, String> globalTable = builder.globalTable("table",
-                                                                             consumed,
-                                                                             new MaterializedInternal<>(
-                                                                                     Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one), builder, storePrefix));
-        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2",
-                                                                              consumed,
-                                                                              new MaterializedInternal<>(
-                                                                                      Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two), builder, storePrefix));
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as(one));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+        final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, materializedInternal);
 
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), builder, storePrefix);
-        builder.table("not-global", consumed, materialized);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal2 =
+            new MaterializedInternal<>(Materialized.as(two));
+        materializedInternal2.generateStoreNameIfNeeded(builder, storePrefix);
+        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, materializedInternal2);
 
-        final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
-            @Override
-            public String apply(final String key, final String value) {
-                return value;
-            }
-        };
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternalNotGlobal =
+            new MaterializedInternal<>(Materialized.as("not-global"));
+        materializedInternalNotGlobal.generateStoreNameIfNeeded(builder, storePrefix);
+        builder.table("not-global", consumed, materializedInternalNotGlobal);
+
+        final KeyValueMapper<String, String, String> kvMapper = (key, value) -> value;
 
         final KStream<String, String> stream = builder.stream(Collections.singleton("t1"), consumed);
         stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
@@ -260,9 +251,10 @@ public class InternalStreamsBuilderTest {
     public void shouldMapStateStoresToCorrectSourceTopics() {
         final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
 
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), builder, storePrefix);
-        final KTable<String, String> table = builder.table("table-topic", consumed, materialized);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as("table-store"));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+        final KTable<String, String> table = builder.table("table-topic", consumed, materializedInternal);
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
 
         final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
index 5fd76f3..1a83ef1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -49,8 +49,9 @@ public class MaterializedInternalTest {
 
         EasyMock.replay(nameProvider);
 
-        final MaterializedInternal<Object, Object, StateStore> materialized
-                = new MaterializedInternal<>(Materialized.with(null, null), nameProvider, prefix);
+        final MaterializedInternal<Object, Object, StateStore> materialized =
+            new MaterializedInternal<>(Materialized.with(null, null));
+        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
 
         assertThat(materialized.storeName(), equalTo(generatedName));
         EasyMock.verify(nameProvider);
@@ -59,8 +60,9 @@ public class MaterializedInternalTest {
     @Test
     public void shouldUseProvidedStoreNameWhenSet() {
         final String storeName = "store-name";
-        final MaterializedInternal<Object, Object, StateStore> materialized
-                = new MaterializedInternal<>(Materialized.as(storeName), nameProvider, prefix);
+        final MaterializedInternal<Object, Object, StateStore> materialized =
+            new MaterializedInternal<>(Materialized.as(storeName));
+        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
         assertThat(materialized.storeName(), equalTo(storeName));
     }
 
@@ -69,8 +71,9 @@ public class MaterializedInternalTest {
         final String storeName = "other-store-name";
         EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes();
         EasyMock.replay(supplier);
-        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.as(supplier), nameProvider, prefix);
+        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.as(supplier));
+        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
         assertThat(materialized.storeName(), equalTo(storeName));
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index c71f469..f3e9299 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -73,19 +73,21 @@ public class GlobalStreamThreadTest {
     @Before
     public void before() {
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
-            Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>with(null, null),
+            Materialized.with(null, null));
+        materialized.generateStoreNameIfNeeded(
             new InternalNameProvider() {
                 @Override
-                public String newProcessorName(String prefix) {
+                public String newProcessorName(final String prefix) {
                     return "processorName";
                 }
 
                 @Override
-                public String newStoreName(String prefix) {
+                public String newStoreName(final String prefix) {
                     return GLOBAL_STORE_NAME;
                 }
             },
-            "store-");
+            "store-"
+        );
 
         builder.addGlobalStore(
             (StoreBuilder) new KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 9ba86ac..fc243c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -53,10 +53,10 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"),
-                                             nameProvider,
-                                             storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.as("store"));
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
+
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -69,9 +69,10 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
-                                                     .withCachingDisabled(), nameProvider, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
+            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled()
+        );
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -81,9 +82,11 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
-                                                     .withLoggingDisabled(), nameProvider, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
+            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
+                                                     .withLoggingDisabled()
+        );
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -94,10 +97,11 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() {
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
+            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
                                                      .withCachingDisabled()
-                                                     .withLoggingDisabled(), nameProvider, storePrefix);
+                                                     .withLoggingDisabled());
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -114,8 +118,9 @@ public class KeyValueStoreMaterializerTest {
         EasyMock.expect(supplier.get()).andReturn(store);
         EasyMock.replay(supplier);
 
-        final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, Integer>as(supplier), nameProvider, storePrefix);
+        final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.as(supplier));
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize();
         final KeyValueStore<String, Integer> built = builder.build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index f3dce52..936c67b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -829,7 +829,9 @@ public class StreamThreadTest {
         final TopicPartition partition2 = t2p1;
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
-        internalStreamsBuilder.table(topic2, new ConsumedInternal(), new MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, ""));
+        final MaterializedInternal materialized = new MaterializedInternal(Materialized.as(storeName2));
+        materialized.generateStoreNameIfNeeded(internalStreamsBuilder, "");
+        internalStreamsBuilder.table(topic2, new ConsumedInternal(), materialized);
 
         final StreamThread thread = createStreamThread(clientId, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message