kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6998: Disable Caching when max.cache.bytes are zero. (#5488)
Date Fri, 17 Aug 2018 16:35:46 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d77898  KAFKA-6998: Disable Caching when max.cache.bytes are zero. (#5488)
0d77898 is described below

commit 0d778987ee3b920b5c876696385c0792671adef8
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Fri Aug 17 09:35:38 2018 -0700

    KAFKA-6998: Disable Caching when max.cache.bytes are zero. (#5488)
    
    1) As titled, add a rewriteTopology that 1) sets application id, 2) maybe disable caching, 3) adjust for source KTable. This optimization can hence be applied for both DSL or PAPI generated Topology.
    
    2) Defer the building of globalStateStores in rewriteTopology so that we can also disable caching. But we still need to build the state stores before InternalTopologyBuilder.build() since we should only build global stores once for all threads.
    
    3) Added withCachingDisabled to StoreBuilder, it is a public API change.
    
    4) [Optional] Fixed unit test config setting functionalities, and set the necessary config to shorten the unit test latency (now it reduces from 5min to 3.5min on my laptop).
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Ted Yu <yuzhihong@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |  7 +--
 .../internals/InternalTopologyBuilder.java         | 59 ++++++++++++++++++----
 .../apache/kafka/streams/state/StoreBuilder.java   |  6 +++
 .../state/internals/AbstractStoreBuilder.java      |  6 +++
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 17 ++++---
 .../apache/kafka/streams/StreamsBuilderTest.java   | 20 ++++----
 .../apache/kafka/streams/StreamsConfigTest.java    | 10 ++--
 .../org/apache/kafka/streams/TopologyWrapper.java  |  9 ++--
 .../streams/integration/EosIntegrationTest.java    | 13 ++++-
 .../FineGrainedAutoResetIntegrationTest.java       |  6 +++
 .../integration/InternalTopicIntegrationTest.java  | 33 ++++--------
 .../PurgeRepartitionTopicIntegrationTest.java      |  3 +-
 .../integration/RegexSourceIntegrationTest.java    |  7 +++
 ...artitionWithMergeOptimizingIntegrationTest.java |  4 +-
 .../integration/RestoreIntegrationTest.java        |  1 +
 .../kstream/internals/GlobalKTableJoinsTest.java   |  2 +-
 .../internals/InternalStreamsBuilderTest.java      | 31 +++++++++---
 .../kstream/internals/KGroupedStreamImplTest.java  |  2 +-
 .../kstream/internals/KGroupedTableImplTest.java   |  2 +-
 .../kstream/internals/KStreamBranchTest.java       |  2 +-
 .../kstream/internals/KStreamFilterTest.java       |  2 +-
 .../kstream/internals/KStreamFlatMapTest.java      |  2 +-
 .../internals/KStreamFlatMapValuesTest.java        |  2 +-
 .../kstream/internals/KStreamForeachTest.java      |  2 +-
 .../internals/KStreamGlobalKTableJoinTest.java     |  2 +-
 .../internals/KStreamGlobalKTableLeftJoinTest.java |  2 +-
 .../streams/kstream/internals/KStreamImplTest.java |  2 +-
 .../kstream/internals/KStreamKStreamJoinTest.java  |  2 +-
 .../internals/KStreamKStreamLeftJoinTest.java      |  2 +-
 .../kstream/internals/KStreamKTableJoinTest.java   |  2 +-
 .../internals/KStreamKTableLeftJoinTest.java       |  2 +-
 .../streams/kstream/internals/KStreamMapTest.java  |  2 +-
 .../kstream/internals/KStreamMapValuesTest.java    |  2 +-
 .../streams/kstream/internals/KStreamPeekTest.java |  2 +-
 .../kstream/internals/KStreamSelectKeyTest.java    |  2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java | 24 ++-------
 .../kstream/internals/KStreamTransformTest.java    |  2 +-
 .../internals/KStreamTransformValuesTest.java      |  2 +-
 .../internals/KStreamWindowAggregateTest.java      |  2 +-
 .../kstream/internals/KStreamWindowReduceTest.java |  2 +-
 .../kstream/internals/KTableFilterTest.java        |  2 +-
 .../streams/kstream/internals/KTableImplTest.java  |  2 +-
 .../kstream/internals/KTableMapKeysTest.java       |  2 +-
 .../kstream/internals/KTableMapValuesTest.java     |  2 +-
 .../kstream/internals/KTableSourceTest.java        |  2 +-
 .../internals/SessionWindowedKStreamImplTest.java  |  2 +-
 .../internals/TimeWindowedKStreamImplTest.java     |  2 +-
 .../internals/AbstractProcessorContextTest.java    |  4 +-
 .../internals/GlobalStreamThreadTest.java          |  2 +-
 .../internals/InternalTopologyBuilderTest.java     |  6 ++-
 .../processor/internals/ProcessorTopologyTest.java |  4 +-
 .../internals/StreamsMetadataStateTest.java        | 50 +++++++++---------
 .../state/internals/MeteredWindowStoreTest.java    |  2 +-
 .../streams/state/internals/RocksDBStoreTest.java  |  4 +-
 .../StreamThreadStateStoreProviderTest.java        |  3 +-
 .../kafka/test/InternalMockProcessorContext.java   | 12 ++---
 .../org/apache/kafka/test/StreamsTestUtils.java    | 55 ++++++++------------
 .../apache/kafka/streams/TopologyTestDriver.java   |  2 +-
 59 files changed, 254 insertions(+), 208 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9071c9d..33e037c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1749,7 +1749,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (!parts.isEmpty())
                 return parts;
 
-            Timer timer = time.timer(requestTimeoutMs);
+            Timer timer = time.timer(timeout);
             Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
                     new MetadataRequest.Builder(Collections.singletonList(topic), true), timer);
             return topicMetadata.get(topic);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2cefb35..82323d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -635,10 +635,6 @@ public class KafkaStreams {
         this.config = config;
         this.time = time;
 
-        // adjust the topology if optimization is turned on.
-        // TODO: to be removed post 2.0
-        internalTopologyBuilder.adjust(config);
-
         // The application ID is a required config and hence should always have value
         processId = UUID.randomUUID();
         final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
@@ -667,7 +663,8 @@ public class KafkaStreams {
         reporters.add(new JmxReporter(JMX_PREFIX));
         metrics = new Metrics(metricConfig, reporters, time);
 
-        internalTopologyBuilder.setApplicationId(applicationId);
+        // re-write the physical topology according to the config
+        internalTopologyBuilder.rewriteTopology(config);
 
         // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
         internalTopologyBuilder.build();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 99d616f..edff470 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -66,7 +66,10 @@ public class InternalTopologyBuilder {
     // state factories
     private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
 
-    // global state factories
+    // built global state stores
+    private final Map<String, StoreBuilder> globalStateBuilders = new LinkedHashMap<>();
+
+    // built global state stores
     private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>();
 
     // all topics subscribed from source processors (without application-id prefix for internal topics)
@@ -326,6 +329,7 @@ public class InternalTopologyBuilder {
         }
     }
 
+    // public for testing only
     public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) {
         Objects.requireNonNull(applicationId, "applicationId can't be null");
         this.applicationId = applicationId;
@@ -333,6 +337,35 @@ public class InternalTopologyBuilder {
         return this;
     }
 
+    public synchronized final InternalTopologyBuilder rewriteTopology(final StreamsConfig config) {
+        Objects.requireNonNull(config, "config can't be null");
+
+        // set application id
+        setApplicationId(config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
+
+        // maybe strip out caching layers
+        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) == 0L) {
+            for (final StateStoreFactory storeFactory : stateFactories.values()) {
+                storeFactory.builder.withCachingDisabled();
+            }
+
+            for (final StoreBuilder storeBuilder : globalStateBuilders.values()) {
+                storeBuilder.withCachingDisabled();
+            }
+        }
+
+        // build global state stores
+        for (final StoreBuilder storeBuilder : globalStateBuilders.values()) {
+            globalStateStores.put(storeBuilder.name(), storeBuilder.build());
+        }
+
+        // adjust the topology if optimization is turned on.
+        // TODO: to be removed post 2.0
+        adjust(config);
+
+        return this;
+    }
+
     public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
@@ -524,7 +557,7 @@ public class InternalTopologyBuilder {
                        processorName,
                        stateUpdateSupplier,
                        storeBuilder.name(),
-                       storeBuilder.build());
+                       storeBuilder);
     }
 
     private void validateTopicNotAlreadyRegistered(final String topic) {
@@ -552,8 +585,8 @@ public class InternalTopologyBuilder {
         }
     }
 
-    public final void connectSourceStoreAndTopic(final String sourceStoreName,
-                                                 final String topic) {
+    private void connectSourceStoreAndTopic(final String sourceStoreName,
+                                            final String topic) {
         if (storeToChangelogTopic.containsKey(sourceStoreName)) {
             throw new TopologyException("Source store " + sourceStoreName + " is already added.");
         }
@@ -593,7 +626,7 @@ public class InternalTopologyBuilder {
         if (nodeFactories.containsKey(processorName)) {
             throw new TopologyException("Processor " + processorName + " is already added.");
         }
-        if (stateFactories.containsKey(storeName) || globalStateStores.containsKey(storeName)) {
+        if (stateFactories.containsKey(storeName) || globalStateBuilders.containsKey(storeName)) {
             throw new TopologyException("StateStore " + storeName + " is already added.");
         }
         if (loggingEnabled) {
@@ -612,7 +645,7 @@ public class InternalTopologyBuilder {
                                 final String processorName,
                                 final ProcessorSupplier stateUpdateSupplier,
                                 final String name,
-                                final KeyValueStore store) {
+                                final StoreBuilder<KeyValueStore> storeBuilder) {
         final String[] topics = {topic};
         final String[] predecessors = {sourceName};
         final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
@@ -631,13 +664,13 @@ public class InternalTopologyBuilder {
         nodeFactories.put(processorName, nodeFactory);
         nodeGrouper.add(processorName);
         nodeGrouper.unite(processorName, predecessors);
-        globalStateStores.put(name, store);
+        globalStateBuilders.put(name, storeBuilder);
         connectSourceStoreAndTopic(name, topic);
     }
 
     private void connectProcessorAndStateStore(final String processorName,
                                                final String stateStoreName) {
-        if (globalStateStores.containsKey(stateStoreName)) {
+        if (globalStateBuilders.containsKey(stateStoreName)) {
             throw new TopologyException("Global StateStore " + stateStoreName +
                     " can be used by a Processor without being specified; it should not be explicitly passed.");
         }
@@ -804,6 +837,8 @@ public class InternalTopologyBuilder {
      * @return ProcessorTopology
      */
     public synchronized ProcessorTopology buildGlobalStateTopology() {
+        Objects.requireNonNull(applicationId, "topology has not completed optimization");
+
         final Set<String> globalGroups = globalNodeGroups();
         if (globalGroups.isEmpty()) {
             return null;
@@ -825,6 +860,8 @@ public class InternalTopologyBuilder {
     }
 
     private ProcessorTopology build(final Set<String> nodeGroup) {
+        Objects.requireNonNull(applicationId, "topology has not completed optimization");
+
         final Map<String, ProcessorNode> processorMap = new LinkedHashMap<>();
         final Map<String, SourceNode> topicSourceMap = new HashMap<>();
         final Map<String, SinkNode> topicSinkMap = new HashMap<>();
@@ -950,10 +987,14 @@ public class InternalTopologyBuilder {
      * @return map containing all global {@link StateStore}s
      */
     public Map<String, StateStore> globalStateStores() {
+        Objects.requireNonNull(applicationId, "topology has not completed optimization");
+
         return Collections.unmodifiableMap(globalStateStores);
     }
 
     public Set<String> allStateStoreName() {
+        Objects.requireNonNull(applicationId, "topology has not completed optimization");
+
         final Set<String> allNames = new HashSet<>(stateFactories.keySet());
         allNames.addAll(globalStateStores.keySet());
         return Collections.unmodifiableSet(allNames);
@@ -1036,7 +1077,7 @@ public class InternalTopologyBuilder {
 
     // Adjust the generated topology based on the configs.
     // Not exposed as public API and should be removed post 2.0
-    public void adjust(final StreamsConfig config) {
+    private void adjust(final StreamsConfig config) {
         final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
 
         if (enableOptimization20) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
index 2d1b241..a930468 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
@@ -33,6 +33,12 @@ public interface StoreBuilder<T extends StateStore> {
     StoreBuilder<T> withCachingEnabled();
 
     /**
+     * Disable caching on the store.
+     * @return  this
+     */
+    StoreBuilder<T> withCachingDisabled();
+
+    /**
      * Maintain a changelog for any changes made to the store.
      * Use the provided config to set the config of the changelog topic.
      * @param config  config applied to the changelog topic
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
index fdcd2e7..898db9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
@@ -53,6 +53,12 @@ abstract public class AbstractStoreBuilder<K, V, T extends StateStore> implement
     }
 
     @Override
+    public StoreBuilder<T> withCachingDisabled() {
+        enableCaching = false;
+        return this;
+    }
+
+    @Override
     public StoreBuilder<T> withLoggingEnabled(final Map<String, String> config) {
         Objects.requireNonNull(config, "config can't be null");
         enableLogging = true;
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 28e3d55..481c860 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -81,11 +81,12 @@ public class KafkaStreamsTest {
     @Before
     public void before() {
         props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
+        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streams = new KafkaStreams(builder.build(), props);
     }
 
@@ -238,10 +239,10 @@ public class KafkaStreamsTest {
     @Test
     public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
         final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
-        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
+        props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
         props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 200);
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 56e6a6d..4fab4ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -53,7 +53,7 @@ import static org.junit.Assert.assertFalse;
 public class StreamsBuilderTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
@@ -61,7 +61,7 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -75,7 +75,7 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
@@ -88,7 +88,7 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -102,7 +102,7 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
@@ -116,7 +116,7 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Utils.mkSet(topology.stateStores().get(0).name(), topology.stateStores().get(1).name())));
@@ -130,7 +130,7 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(3));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Collections.singleton("store")));
@@ -143,7 +143,7 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -284,11 +284,11 @@ public class StreamsBuilderTest {
         final String topic = "topic";
         builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
         final Topology topology = builder.build();
-        final Properties props = StreamsTestUtils.minimalStreamsConfig();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
 
         final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
-        internalTopologyBuilder.adjust(new StreamsConfig(props));
+        internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
 
         assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "topic")));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 1791672..83279cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -49,7 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
-import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
+import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -523,7 +523,7 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldUseNewConfigsWhenPresent() {
-        final Properties props = minimalStreamsConfig();
+        final Properties props = getStreamsConfig();
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
@@ -536,7 +536,7 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldUseCorrectDefaultsWhenNoneSpecified() {
-        final StreamsConfig config = new StreamsConfig(minimalStreamsConfig());
+        final StreamsConfig config = new StreamsConfig(getStreamsConfig());
         assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde);
         assertTrue(config.defaultValueSerde() instanceof Serdes.ByteArraySerde);
         assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
@@ -544,7 +544,7 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldSpecifyCorrectKeySerdeClassOnError() {
-        final Properties props = minimalStreamsConfig();
+        final Properties props = getStreamsConfig();
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig config = new StreamsConfig(props);
         try {
@@ -558,7 +558,7 @@ public class StreamsConfigTest {
     @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectValueSerdeClassOnError() {
-        final Properties props = minimalStreamsConfig();
+        final Properties props = getStreamsConfig();
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig config = new StreamsConfig(props);
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
index 940ec0e..e1c7c11 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.test.StreamsTestUtils;
 
 /**
  *  This class allows to access the {@link InternalTopologyBuilder} a {@link Topology} object.
@@ -25,14 +26,14 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 public class TopologyWrapper extends Topology {
 
     static public InternalTopologyBuilder getInternalTopologyBuilder(final Topology topology) {
-        return topology.internalTopologyBuilder;
+        return topology.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
     }
 
     public InternalTopologyBuilder getInternalBuilder() {
-        return internalTopologyBuilder;
+        return internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
     }
 
-    public void setApplicationId(final String applicationId) {
-        internalTopologyBuilder.setApplicationId(applicationId);
+    public InternalTopologyBuilder getInternalBuilder(final String applicationId) {
+        return internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(applicationId)));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 770f579..9aa0772 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -163,8 +163,13 @@ public class EosIntegrationTest {
                     Serdes.LongSerde.class.getName(),
                     new Properties() {
                         {
-                            put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
                             put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+                            put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+                            put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+                            put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
+                            put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+                            put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+                            put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
                         }
                     }));
 
@@ -248,6 +253,10 @@ public class EosIntegrationTest {
                 new Properties() {
                     {
                         put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+                        put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+                        put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+                        put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+                        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                     }
                 }));
 
@@ -669,6 +678,8 @@ public class EosIntegrationTest {
                         put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
                         put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads);
                         put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, -1);
+                        put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+                        put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
                         put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
                         put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
                         put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 08ab120..ac5a418 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -140,6 +140,9 @@ public class FineGrainedAutoResetIntegrationTest {
     public void setUp() throws IOException {
 
         final Properties props = new Properties();
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
@@ -277,6 +280,9 @@ public class FineGrainedAutoResetIntegrationTest {
     @Test
     public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
         final Properties props = new Properties();
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
 
         final Properties localConfig = StreamsTestUtils.getStreamsConfig(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index fe7ee26..153c5a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -35,9 +36,7 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
@@ -89,10 +88,10 @@ public class InternalTopicIntegrationTest {
         streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
     }
 
     @After
@@ -147,14 +146,9 @@ public class InternalTopicIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
 
-        textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
-            @Override
-            public Iterable<String> apply(final String value) {
-                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-            }
-        })
-                .groupBy(MockMapper.<String, String>selectValueMapper())
-                .count(Materialized.<String, Long, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("Counts"));
+        textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+            .groupBy(MockMapper.selectValueMapper())
+            .count(Materialized.as("Counts"));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp);
         streams.start();
@@ -191,19 +185,10 @@ public class InternalTopicIntegrationTest {
 
         final int durationMs = 2000;
 
-        textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
-            @Override
-            public Iterable<String> apply(final String value) {
-                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-            }
-        })
-            .groupBy(MockMapper.<String, String>selectValueMapper())
+        textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+            .groupBy(MockMapper.selectValueMapper())
             .windowedBy(TimeWindows.of(1000).grace(0L))
-            .count(
-                Materialized
-                    .<String, Long, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows")
-                    .withRetention(2_000L)
-            );
+            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("CountWindows").withRetention(2_000L));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp);
         streams.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 29f077f..2269a5d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -151,6 +151,7 @@ public class PurgeRepartitionTopicIntegrationTest {
 
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, purgeIntervalMs);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
@@ -158,7 +159,7 @@ public class PurgeRepartitionTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), purgeIntervalMs);
         streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), purgeSegmentBytes);
         streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), purgeSegmentBytes / 2);    // we cannot allow batch size larger than segment size
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, purgeIntervalMs);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index d3a7aee..7cfde61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
@@ -29,6 +30,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -115,7 +117,12 @@ public class RegexSourceIntegrationTest {
         CLUSTER.deleteAndRecreateTopics(DEFAULT_OUTPUT_TOPIC);
 
         final Properties properties = new Properties();
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+
         streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
                                                                  CLUSTER.bootstrapServers(),
                                                                  STRING_SERDE_CLASSNAME,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
index 200062e..58d903a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -165,10 +165,10 @@ public class RepartitionWithMergeOptimizingIntegrationTest {
         streams.start();
 
         final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L));
-        final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues.size());
+        final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues);
 
         final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6"));
-        final List<KeyValue<String, String>> receivedCountStringKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues.size());
+        final List<KeyValue<String, String>> receivedCountStringKeyValues = IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues);
 
         assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
         assertThat(receivedCountStringKeyValues, equalTo(expectedStringCountKeyValues));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index c1d07dc..5eb4fc7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -106,6 +106,7 @@ public class RestoreIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         return streamsConfiguration;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 6af3331..c43fcd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -99,7 +99,7 @@ public class GlobalKTableJoinsTest {
 
     private void verifyJoin(final Map<String, String> expected) {
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             // write some data to the global table
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index c6c4996..2bf6971 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -33,6 +34,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -60,7 +62,6 @@ public class InternalStreamsBuilderTest {
     private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"));
 
     {
-        builder.internalTopologyBuilder.setApplicationId(APP_ID);
         materialized.generateStoreNameIfNeeded(builder, storePrefix);
     }
 
@@ -132,7 +133,9 @@ public class InternalStreamsBuilderTest {
         final KTable table1 = builder.table("topic2", consumed, materializedInternal);
 
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
+        final ProcessorTopology topology = builder.internalTopologyBuilder
+            .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .build(null);
 
         assertEquals(1, topology.stateStores().size());
         final String storeName = "prefix-STATE-STORE-0000000000";
@@ -174,7 +177,9 @@ public class InternalStreamsBuilderTest {
             materializedInternal);
 
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
+        final ProcessorTopology topology = builder.internalTopologyBuilder
+            .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
 
         assertEquals(1, stateStores.size());
@@ -182,7 +187,9 @@ public class InternalStreamsBuilderTest {
     }
 
     private void doBuildGlobalTopologyWithAllGlobalTables() {
-        final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
+        final ProcessorTopology topology = builder.internalTopologyBuilder
+            .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .buildGlobalStateTopology();
 
         final List<StateStore> stateStores = topology.globalStateStores();
         final Set<String> sourceTopics = topology.sourceTopics();
@@ -263,8 +270,9 @@ public class InternalStreamsBuilderTest {
 
 
         final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
-        mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count"));
+        mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.as("count"));
         builder.buildAndOptimizeTopology();
+        builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)));
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count"));
     }
@@ -359,6 +367,7 @@ public class InternalStreamsBuilderTest {
     public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.stream(Collections.singleton("topic"), consumed);
         builder.buildAndOptimizeTopology();
+        builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)));
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
@@ -368,7 +377,9 @@ public class InternalStreamsBuilderTest {
         final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
         builder.stream(Collections.singleton("topic"), consumed);
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
+        final ProcessorTopology processorTopology = builder.internalTopologyBuilder
+            .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
@@ -376,7 +387,9 @@ public class InternalStreamsBuilderTest {
     public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.table("topic", consumed, materialized);
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
+        final ProcessorTopology processorTopology = builder.internalTopologyBuilder
+            .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
@@ -385,7 +398,9 @@ public class InternalStreamsBuilderTest {
         final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor()));
         builder.table("topic", consumed, materialized);
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
+        final ProcessorTopology processorTopology = builder.internalTopologyBuilder
+            .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 1517f0e..a1f8b27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -75,7 +75,7 @@ public class KGroupedStreamImplTest {
     private KGroupedStream<String, String> groupedStream;
 
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 31863f2..662ede7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -56,7 +56,7 @@ public class KGroupedTableImplTest {
     private final StreamsBuilder builder = new StreamsBuilder();
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private KGroupedTable<String, String> groupedTable;
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
     private final String topic = "input";
 
     @Before
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index b7035aa..b977346 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -39,7 +39,7 @@ public class KStreamBranchTest {
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @SuppressWarnings("unchecked")
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 6306d2e..d7e6240 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -37,7 +37,7 @@ public class KStreamFilterTest {
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     private final Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 8daad99..7feb18f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -39,7 +39,7 @@ public class KStreamFlatMapTest {
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMap() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index aca0911..0c4495e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -38,7 +38,7 @@ public class KStreamFlatMapValuesTest {
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMapValues() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index fbcd6db..feed2fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -41,7 +41,7 @@ public class KStreamForeachTest {
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testForeach() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index d5c5a54..a68583e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -74,7 +74,7 @@ public class KStreamGlobalKTableJoinTest {
         };
         stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 248c3ee..aebc2a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -76,7 +76,7 @@ public class KStreamGlobalKTableLeftJoinTest {
         };
         stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 354fa0a..bce7fc8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -74,7 +74,7 @@ public class KStreamImplTest {
     private StreamsBuilder builder;
 
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 963b681..971ee62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -53,7 +53,7 @@ public class KStreamKStreamJoinTest {
 
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 338b96a..856de3d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -48,7 +48,7 @@ public class KStreamKStreamLeftJoinTest {
 
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testLeftJoin() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 6ffce04..eb6536d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -72,7 +72,7 @@ public class KStreamKTableJoinTest {
         table = builder.table(tableTopic, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         processor = supplier.theCapturedProcessor();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 1c3e027..8c57038 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -68,7 +68,7 @@ public class KStreamKTableLeftJoinTest {
         table = builder.table(tableTopic, consumed);
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         processor = supplier.theCapturedProcessor();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index e1851c1..2692c17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -38,7 +38,7 @@ public class KStreamMapTest {
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testMap() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 8de8a81..67891be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -39,7 +39,7 @@ public class KStreamMapValuesTest {
     private String topicName = "topic";
     private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMapValues() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 3e4012e..780bcae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -40,7 +40,7 @@ public class KStreamPeekTest {
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void shouldObserveStreamElements() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 62f7677..5f804d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -41,7 +41,7 @@ public class KStreamSelectKeyTest {
     private String topicName = "topic_key_select";
 
     private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(topicName, new StringSerializer(), new IntegerSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
 
     @Test
     public void testSelectKey() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 74cd7bd..c7fd7cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -64,24 +64,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private static final long GAP_MS = 5 * 60 * 1000L;
     private static final String STORE_NAME = "session-store";
 
-    private final Initializer<Long> initializer = new Initializer<Long>() {
-        @Override
-        public Long apply() {
-            return 0L;
-        }
-    };
-    private final Aggregator<String, String, Long> aggregator = new Aggregator<String, String, Long>() {
-        @Override
-        public Long apply(final String aggKey, final String value, final Long aggregate) {
-            return aggregate + 1;
-        }
-    };
-    private final Merger<String, Long> sessionMerger = new Merger<String, Long>() {
-        @Override
-        public Long apply(final String aggKey, final Long aggOne, final Long aggTwo) {
-            return aggOne + aggTwo;
-        }
-    };
+    private final Initializer<Long> initializer = () -> 0L;
+    private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
+    private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
     private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator =
         new KStreamSessionWindowAggregate<>(
             SessionWindows.with(GAP_MS),
@@ -96,7 +81,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private InternalMockProcessorContext context;
     private Metrics metrics;
 
-
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
@@ -107,7 +91,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
             Serdes.String(),
             Serdes.String(),
             metrics,
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             NoOpRecordCollector::new,
             new ThreadCache(new LogContext("testCache "), 100000, metrics)
         ) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 9a61874..a8ee681 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -44,7 +44,7 @@ public class KStreamTransformTest {
     private String topicName = "topic";
 
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
 
     @Rule
     public final KStreamTestDriver kstreamDriver = new KStreamTestDriver();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index c399650..570053c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -51,7 +51,7 @@ public class KStreamTransformValuesTest {
     private String topicName = "topic";
     private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
     @Mock(MockType.NICE)
     private ProcessorContext context;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index af7cff6..5a295b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -60,7 +60,7 @@ import static org.junit.Assert.assertEquals;
 public class KStreamWindowAggregateTest {
 
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testAggBasic() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index bc8ca95..3746ae9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamWindowReduceTest {
 
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 2995b23..3e143f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -49,7 +49,7 @@ public class KTableFilterTest {
 
     private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
     private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
 
     private void doTestKTable(final StreamsBuilder builder,
                               final KTable<String, Integer> table2,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 60d7426..eb586e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -64,7 +64,7 @@ public class KTableImplTest {
 
     private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
     private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
 
     private StreamsBuilder builder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index bf5660a..ce7de16 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals;
 public class KTableMapKeysTest {
 
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testMapKeysConvertingToStream() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index ddfd5a5..0f56043 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -51,7 +51,7 @@ public class KTableMapValuesTest {
     private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
     private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 80a60ab..2055f9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -48,7 +48,7 @@ public class KTableSourceTest {
 
     private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testKTable() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 825edb3..34a235a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -55,7 +55,7 @@ public class SessionWindowedKStreamImplTest {
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     private final Merger<String, String> sessionMerger = new Merger<String, String>() {
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 7b885b2..0e541c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -54,7 +54,7 @@ public class TimeWindowedKStreamImplTest {
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private TimeWindowedKStream<String, String> windowedStream;
 
     @Before
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 54a927c..c4699ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -37,7 +37,7 @@ import org.junit.Test;
 
 import java.util.Properties;
 
-import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
+import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -182,7 +182,7 @@ public class AbstractProcessorContextTest {
     private static class TestProcessorContext extends AbstractProcessorContext {
         static Properties config;
         static {
-            config = minimalStreamsConfig();
+            config = getStreamsConfig();
             // Value must be a string to test className -> class conversion
             config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBConfigSetter.class.getName());
             config.put("user.supplied.config", "user-suppplied-value");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 95c6943..37a6fdb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -104,7 +104,7 @@ public class GlobalStreamThreadTest {
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah");
         properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         config = new StreamsConfig(properties);
-        globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
+        globalStreamThread = new GlobalStreamThread(builder.rewriteTopology(config).buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
                                                     new StateDirectory(config, time),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 78c217d..7230e5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -32,6 +33,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStoreBuilder;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -626,7 +628,7 @@ public class InternalTopologyBuilderTest {
     @Test
     public void shouldAddTimestampExtractorPerSource() {
         builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic");
-        final ProcessorTopology processorTopology = builder.build(null);
+        final ProcessorTopology processorTopology = builder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig())).build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
@@ -634,7 +636,7 @@ public class InternalTopologyBuilderTest {
     public void shouldAddTimestampExtractorWithPatternPerSource() {
         final Pattern pattern = Pattern.compile("t.*");
         builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern);
-        final ProcessorTopology processorTopology = builder.build(null);
+        final ProcessorTopology processorTopology = builder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig())).build(null);
         assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 3495e80..587cae2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -102,8 +102,6 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testTopologyMetadata() {
-        topology.setApplicationId("X");
-
         topology.addSource("source-1", "topic-1");
         topology.addSource("source-2", "topic-2", "topic-3");
         topology.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
@@ -111,7 +109,7 @@ public class ProcessorTopologyTest {
         topology.addSink("sink-1", "topic-3", "processor-1");
         topology.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
-        final ProcessorTopology processorTopology = topology.getInternalBuilder().build();
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("X").build();
 
         assertEquals(6, processorTopology.processors().size());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 0cf147c..5b1da16 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -53,7 +53,7 @@ import static org.junit.Assert.assertTrue;
 
 public class StreamsMetadataStateTest {
 
-    private StreamsMetadataState discovery;
+    private StreamsMetadataState metadataState;
     private HostInfo hostOne;
     private HostInfo hostTwo;
     private HostInfo hostThree;
@@ -121,8 +121,8 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne);
-        discovery.onChange(hostToPartitions, cluster);
+        metadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne);
+        metadataState.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
             public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
@@ -145,7 +145,7 @@ public class StreamsMetadataStateTest {
         final StreamsMetadata three = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"),
                 Collections.singleton(topic3P0));
 
-        final Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+        final Collection<StreamsMetadata> actual = metadataState.getAllMetadata();
         assertEquals(3, actual.size());
         assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
         assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
@@ -165,11 +165,11 @@ public class StreamsMetadataStateTest {
         final HostInfo hostFour = new HostInfo("host-four", 8080);
         hostToPartitions.put(hostFour, Utils.mkSet(tp5));
 
-        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null))));
+        metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null))));
 
         final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.singleton(globalTable),
                 Collections.singleton(tp5));
-        final Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+        final Collection<StreamsMetadata> actual = metadataState.getAllMetadata();
         assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected));
     }
 
@@ -179,7 +179,7 @@ public class StreamsMetadataStateTest {
                 Utils.mkSet(topic1P0, topic2P1, topic4P0));
         final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"),
                 Utils.mkSet(topic2P0, topic1P1));
-        final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("table-one");
+        final Collection<StreamsMetadata> actual = metadataState.getAllMetadataForStore("table-one");
         assertEquals(2, actual.size());
         assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
         assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
@@ -187,12 +187,12 @@ public class StreamsMetadataStateTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() {
-        discovery.getAllMetadataForStore(null);
+        metadataState.getAllMetadataForStore(null);
     }
 
     @Test
     public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() {
-        final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("not-a-store");
+        final Collection<StreamsMetadata> actual = metadataState.getAllMetadataForStore("not-a-store");
         assertTrue(actual.isEmpty());
     }
 
@@ -201,12 +201,12 @@ public class StreamsMetadataStateTest {
         final TopicPartition tp4 = new TopicPartition("topic-three", 1);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
 
-        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
+        metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
         final StreamsMetadata expected = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"),
                 Collections.singleton(topic3P0));
 
-        final StreamsMetadata actual = discovery.getMetadataWithKey("table-three",
+        final StreamsMetadata actual = metadataState.getMetadataWithKey("table-three",
                                                                     "the-key",
                                                                     Serdes.String().serializer());
 
@@ -218,19 +218,19 @@ public class StreamsMetadataStateTest {
         final TopicPartition tp4 = new TopicPartition("topic-three", 1);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
 
-        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
+        metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
         final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-three", "merged-table"),
                 Utils.mkSet(topic2P0, tp4));
 
-        final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", partitioner);
+        final StreamsMetadata actual = metadataState.getMetadataWithKey("table-three", "the-key", partitioner);
         assertEquals(expected, actual);
     }
 
     @Test
     public void shouldReturnNotAvailableWhenClusterIsEmpty() {
-        discovery.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), Cluster.empty());
-        final StreamsMetadata result = discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer());
+        metadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), Cluster.empty());
+        final StreamsMetadata result = metadataState.getMetadataWithKey("table-one", "a", Serdes.String().serializer());
         assertEquals(StreamsMetadata.NOT_AVAILABLE, result);
     }
 
@@ -238,12 +238,12 @@ public class StreamsMetadataStateTest {
     public void shouldGetInstanceWithKeyWithMergedStreams() {
         final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2));
-        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
+        metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
 
         final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("global-table", "table-two", "table-one", "merged-table"),
                 Utils.mkSet(topic2P0, topic1P1, topic2P2));
 
-        final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() {
+        final StreamsMetadata actual = metadataState.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() {
             @Override
             public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
                 return 2;
@@ -256,7 +256,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() {
-        final StreamsMetadata actual = discovery.getMetadataWithKey("not-a-store",
+        final StreamsMetadata actual = metadataState.getMetadataWithKey("not-a-store",
                 "key",
                 Serdes.String().serializer());
         assertNull(actual);
@@ -264,28 +264,28 @@ public class StreamsMetadataStateTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowWhenKeyIsNull() {
-        discovery.getMetadataWithKey("table-three", null, Serdes.String().serializer());
+        metadataState.getMetadataWithKey("table-three", null, Serdes.String().serializer());
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowWhenSerializerIsNull() {
-        discovery.getMetadataWithKey("table-three", "key", (Serializer) null);
+        metadataState.getMetadataWithKey("table-three", "key", (Serializer) null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfStoreNameIsNull() {
-        discovery.getMetadataWithKey(null, "key", Serdes.String().serializer());
+        metadataState.getMetadataWithKey(null, "key", Serdes.String().serializer());
     }
 
     @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfStreamPartitionerIsNull() {
-        discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null);
+        metadataState.getMetadataWithKey(null, "key", (StreamPartitioner) null);
     }
 
     @Test
     public void shouldHaveGlobalStoreInAllMetadata() {
-        final Collection<StreamsMetadata> metadata = discovery.getAllMetadataForStore(globalTable);
+        final Collection<StreamsMetadata> metadata = metadataState.getAllMetadataForStore(globalTable);
         assertEquals(3, metadata.size());
         for (final StreamsMetadata streamsMetadata : metadata) {
             assertTrue(streamsMetadata.stateStoreNames().contains(globalTable));
@@ -294,7 +294,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetMyMetadataForGlobalStoreWithKey() {
-        final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", Serdes.String().serializer());
+        final StreamsMetadata metadata = metadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer());
         assertEquals(hostOne, metadata.hostInfo());
     }
 
@@ -307,7 +307,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() {
-        final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", partitioner);
+        final StreamsMetadata metadata = metadataState.getMetadataWithKey(globalTable, "key", partitioner);
         assertEquals(hostOne, metadata.hostInfo());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 19bd523..1ac6d94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -71,7 +71,7 @@ public class MeteredWindowStoreTest {
             Serdes.String(),
             Serdes.Long(),
             streamsMetrics,
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             new RecordCollector.Supplier() {
                 @Override
                 public RecordCollector recordCollector() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 113e3e1..5ae32eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -63,7 +63,7 @@ public class RocksDBStoreTest {
 
     @Before
     public void setUp() {
-        final Properties props = StreamsTestUtils.minimalStreamsConfig();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
         rocksDBStore = new RocksDBStore("test");
         dir = TestUtils.tempDirectory();
@@ -131,7 +131,7 @@ public class RocksDBStoreTest {
     @Test
     public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() {
         final File tmpDir = TestUtils.tempDirectory();
-        final InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()));
+        final InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir, new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
 
         assertTrue(tmpDir.setReadOnly());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 33dce97..711cdc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -96,8 +96,7 @@ public class StreamThreadStateStoreProviderTest {
         configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog");
         configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog");
 
-        topology.setApplicationId(applicationId);
-        final ProcessorTopology processorTopology = topology.getInternalBuilder().build();
+        final ProcessorTopology processorTopology = topology.getInternalBuilder(applicationId).build();
 
         tasks = new HashMap<>();
         stateDirectory = new StateDirectory(streamsConfig, new MockTime());
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ec8d328..6d4f5e2 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -68,7 +68,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
             null,
             null,
             new StreamsMetricsImpl(new Metrics(), "mock"),
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             null,
             null
         );
@@ -99,12 +99,8 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
             serdes.keySerde(),
             serdes.valueSerde(),
             new StreamsMetricsImpl(metrics, "mock"),
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
-                @Override
-                public RecordCollector recordCollector() {
-                    return collector;
-                }
-            },
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            () -> collector,
             null
         );
     }
@@ -118,7 +114,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
             keySerde,
             valSerde,
             new StreamsMetricsImpl(new Metrics(), "mock"),
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             cache
         );
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 9406519..1d64316 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -44,48 +44,37 @@ public final class StreamsTestUtils {
                                               final String valueSerdeClassName,
                                               final Properties additional) {
 
-        final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
-        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        streamsConfiguration.putAll(additional);
-        return streamsConfiguration;
-
-    }
-
-    public static Properties topologyTestConfig(final String applicationId,
-                                                final String bootstrapServers,
-                                                final String keyDeserializer,
-                                                final String valueDeserializer) {
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keyDeserializer);
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueDeserializer);
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        props.putAll(additional);
         return props;
     }
 
-    public static Properties topologyTestConfig(final Serde keyDeserializer,
-                                                final Serde valueDeserializer) {
-        return topologyTestConfig(
+    public static Properties getStreamsConfig(final Serde keyDeserializer,
+                                              final Serde valueDeserializer) {
+        return getStreamsConfig(
                 UUID.randomUUID().toString(),
                 "localhost:9091",
                 keyDeserializer.getClass().getName(),
-                valueDeserializer.getClass().getName());
+                valueDeserializer.getClass().getName(),
+                new Properties());
+    }
+
+    public static Properties getStreamsConfig(final String applicationId) {
+        return getStreamsConfig(
+            applicationId,
+            "localhost:9091",
+            Serdes.ByteArraySerde.class.getName(),
+            Serdes.ByteArraySerde.class.getName(),
+            new Properties());
     }
 
-    public static Properties minimalStreamsConfig() {
-        final Properties properties = new Properties();
-        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
-        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "anyserver:9092");
-        return properties;
+    public static Properties getStreamsConfig() {
+        return getStreamsConfig(UUID.randomUUID().toString());
     }
 
     public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 12974ae..05a128b 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -239,7 +239,7 @@ public class TopologyTestDriver implements Closeable {
         mockWallClockTime = new MockTime(initialWallClockTimeMs);
 
         internalTopologyBuilder = builder;
-        internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
+        internalTopologyBuilder.rewriteTopology(streamsConfig);
 
         processorTopology = internalTopologyBuilder.build(null);
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();


Mime
View raw message