kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472)
Date Thu, 10 Oct 2019 23:24:38 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new de202a6  KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472)
de202a6 is described below

commit de202a669777becd4a79fcdf6c0597e5a12d2370
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Thu Oct 10 16:23:18 2019 -0700

    KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472)
    
    All four flavors of the repartition/optimization tests have been reported as flaky and failed in one place or another:
    * RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED
    * RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_NO_OPTIMIZATION
    * RepartitionWithMergeOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED
    * RepartitionWithMergeOptimizingIntegrationTest.shouldSendCorrectRecords_NO_OPTIMIZATION
    
    They're pretty similar so it makes sense to knock them all out at once. This PR does three things:
    
    * Switch to in-memory stores wherever possible
    * Name all operators and update the Topology accordingly (not really a flaky test fix, but had to update the topology names anyway because of the IM stores so figured might as well)
    * Port to TopologyTestDriver -- this is the "real" fix, should make a big difference as these repartition tests required multiple roundtrips with the Kafka cluster (while using only the default timeout)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../internals/RepartitionOptimizingTest.java}      | 548 +++++++++++----------
 .../RepartitionWithMergeOptimizingTest.java}       | 328 ++++++------
 .../tests/StreamsBrokerDownResilienceTest.java     |  15 +-
 .../tests/streams/streams_upgrade_test.py          |   2 +-
 4 files changed, 465 insertions(+), 428 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
similarity index 54%
rename from streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index bea32f2..8425ad7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -15,40 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.streams.processor.internals;
 
-
-import kafka.utils.MockTime;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,17 +58,19 @@ import java.util.Locale;
 import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static java.time.Duration.ofDays;
 import static java.time.Duration.ofMillis;
-import static java.time.Duration.ofSeconds;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
-@Category({IntegrationTest.class})
-public class RepartitionOptimizingIntegrationTest {
+public class RepartitionOptimizingTest {
+
+    private final Logger log = LoggerFactory.getLogger(RepartitionOptimizingTest.class);
 
-    private static final int NUM_BROKERS = 1;
     private static final String INPUT_TOPIC = "input";
     private static final String COUNT_TOPIC = "outputTopic_0";
     private static final String AGGREGATION_TOPIC = "outputTopic_1";
@@ -77,143 +80,168 @@ public class RepartitionOptimizingIntegrationTest {
     private static final int ONE_REPARTITION_TOPIC = 1;
     private static final int FOUR_REPARTITION_TOPICS = 4;
 
+    private final Serializer<String> stringSerializer = new StringSerializer();
+    private final Deserializer<String> stringDeserializer = new StringDeserializer();
+
     private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
 
     private Properties streamsConfiguration;
+    private TopologyTestDriver topologyTestDriver;
 
+    private final Initializer<Integer> initializer = () -> 0;
+    private final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
+    private final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2;
 
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final MockTime mockTime = CLUSTER.time;
+    private final List<String> processorValueCollector = new ArrayList<>();
+
+    private final List<KeyValue<String, Long>> expectedCountKeyValues =
+        Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L));
+    private final List<KeyValue<String, Integer>> expectedAggKeyValues =
+        Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9));
+    private final List<KeyValue<String, String>> expectedReduceKeyValues =
+        Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz"));
+    private final List<KeyValue<String, String>> expectedJoinKeyValues =
+        Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3"));
+    private final List<String> expectedCollectedProcessorValues =
+        Arrays.asList("FOO", "BAR", "BAZ");
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         final Properties props = new Properties();
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
             "maybe-optimized-test-app",
-            CLUSTER.bootstrapServers(),
+            "dummy-bootstrap-servers-config",
             Serdes.String().getClass().getName(),
             Serdes.String().getClass().getName(),
             props);
 
-        CLUSTER.createTopics(INPUT_TOPIC,
-                             COUNT_TOPIC,
-                             AGGREGATION_TOPIC,
-                             REDUCE_TOPIC,
-                             JOINED_TOPIC);
-
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+        processorValueCollector.clear();
     }
 
     @After
-    public void tearDown() throws Exception {
-        CLUSTER.deleteAllTopicsAndWait(30_000L);
+    public void tearDown() {
+        try {
+            topologyTestDriver.close();
+        } catch (final RuntimeException e) {
+            log.warn("The following exception was thrown while trying to close the TopologyTestDriver (note that " +
+                "KAFKA-6647 causes this when running on Windows):", e);
+        }
     }
 
     @Test
-    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
-        runIntegrationTest(StreamsConfig.OPTIMIZE,
-                           ONE_REPARTITION_TOPIC);
+    public void shouldSendCorrectRecords_OPTIMIZED() {
+        runTest(StreamsConfig.OPTIMIZE, ONE_REPARTITION_TOPIC);
     }
 
     @Test
-    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
-        runIntegrationTest(StreamsConfig.NO_OPTIMIZATION,
-                           FOUR_REPARTITION_TOPICS);
+    public void shouldSendCorrectResults_NO_OPTIMIZATION() {
+        runTest(StreamsConfig.NO_OPTIMIZATION, FOUR_REPARTITION_TOPICS);
     }
 
 
-    private void runIntegrationTest(final String optimizationConfig,
-                                    final int expectedNumberRepartitionTopics) throws Exception {
-
-        final Initializer<Integer> initializer = () -> 0;
-        final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
-
-        final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2;
-
-        final List<String> processorValueCollector = new ArrayList<>();
+    private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) {
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> sourceStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
-
-        final KStream<String, String> mappedStream = sourceStream.map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v));
-
-        mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> v.toUpperCase(Locale.getDefault()))
-            .process(() -> new SimpleProcessor(processorValueCollector));
-
-        final KStream<String, Long> countStream = mappedStream.groupByKey().count(Materialized.with(Serdes.String(), Serdes.Long())).toStream();
-
-        countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
-
-        mappedStream.groupByKey().aggregate(initializer,
-                                            aggregator,
-                                            Materialized.with(Serdes.String(), Serdes.Integer()))
-            .toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, String> sourceStream =
+            builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceStream"));
+
+        final KStream<String, String> mappedStream = sourceStream
+            .map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v), Named.as("source-map"));
+
+        mappedStream
+            .filter((k, v) -> k.equals("B"), Named.as("process-filter"))
+            .mapValues(v -> v.toUpperCase(Locale.getDefault()), Named.as("process-mapValues"))
+            .process(() -> new SimpleProcessor(processorValueCollector), Named.as("process"));
+
+        final KStream<String, Long> countStream = mappedStream
+            .groupByKey(Grouped.as("count-groupByKey"))
+            .count(Named.as("count"), Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("count-store"))
+                                                                .withKeySerde(Serdes.String())
+                                                                .withValueSerde(Serdes.Long()))
+            .toStream(Named.as("count-toStream"));
+
+        countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()).withName("count-to"));
+
+        mappedStream
+            .groupByKey(Grouped.as("aggregate-groupByKey"))
+            .aggregate(initializer,
+                       aggregator,
+                       Named.as("aggregate"),
+                       Materialized.<String, Integer>as(Stores.inMemoryKeyValueStore("aggregate-store"))
+                                                    .withKeySerde(Serdes.String())
+                                                    .withValueSerde(Serdes.Integer()))
+            .toStream(Named.as("aggregate-toStream"))
+            .to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()).withName("reduce-to"));
 
         // adding operators for case where the repartition node is further downstream
-        mappedStream.filter((k, v) -> true).peek((k, v) -> System.out.println(k + ":" + v)).groupByKey()
-            .reduce(reducer, Materialized.with(Serdes.String(), Serdes.String()))
-            .toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
-
-        mappedStream.filter((k, v) -> k.equals("A"))
+        mappedStream
+            .filter((k, v) -> true, Named.as("reduce-filter"))
+            .peek((k, v) -> System.out.println(k + ":" + v), Named.as("reduce-peek"))
+            .groupByKey(Grouped.as("reduce-groupByKey"))
+            .reduce(reducer,
+                    Named.as("reducer"),
+                    Materialized.as(Stores.inMemoryKeyValueStore("reduce-store")))
+            .toStream(Named.as("reduce-toStream"))
+            .to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
+
+        mappedStream
+            .filter((k, v) -> k.equals("A"), Named.as("join-filter"))
             .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
                   JoinWindows.of(ofMillis(5000)),
-                  StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long()))
-            .to(JOINED_TOPIC);
+                  StreamJoined.<String, String, Long>with(Stores.inMemoryWindowStore("join-store", ofDays(1), ofMillis(10000), true),
+                                                          Stores.inMemoryWindowStore("other-join-store",  ofDays(1), ofMillis(10000), true))
+                                                    .withName("join")
+                                                    .withKeySerde(Serdes.String())
+                                                    .withValueSerde(Serdes.String())
+                                                    .withOtherValueSerde(Serdes.Long()))
+            .to(JOINED_TOPIC, Produced.as("join-to"));
 
         streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+        final Topology topology = builder.build(streamsConfiguration);
 
-        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+        topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);
 
-        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, getKeyValues(), producerConfig, mockTime);
+        final TestInputTopic<String, String> inputTopicA = topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, stringSerializer);
+        final TestOutputTopic<String, Long> countOutputTopic = topologyTestDriver.createOutputTopic(COUNT_TOPIC, stringDeserializer, new LongDeserializer());
+        final TestOutputTopic<String, Integer> aggregationOutputTopic = topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, stringDeserializer, new IntegerDeserializer());
+        final TestOutputTopic<String, String> reduceOutputTopic = topologyTestDriver.createOutputTopic(REDUCE_TOPIC, stringDeserializer, stringDeserializer);
+        final TestOutputTopic<String, String> joinedOutputTopic = topologyTestDriver.createOutputTopic(JOINED_TOPIC, stringDeserializer, stringDeserializer);
 
-        final Properties consumerConfig1 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
-        final Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, IntegerDeserializer.class);
-        final Properties consumerConfig3 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+        inputTopicA.pipeKeyValueList(getKeyValues());
 
-        final Topology topology = builder.build(streamsConfiguration);
+        // Verify the topology
         final String topologyString = topology.describe().toString();
-
         if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) {
             assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString);
         } else {
             assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString);
         }
 
-
-        /*
-           confirming number of expected repartition topics here
-         */
+        // Verify the number of repartition topics
         assertEquals(expectedNumberRepartitionTopics, getCountOfRepartitionTopicsFound(topologyString));
 
-        final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
-        streams.start();
-
-        final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L));
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues);
-
-        final List<KeyValue<String, Integer>> expectedAggKeyValues = Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9));
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC, expectedAggKeyValues);
-
-        final List<KeyValue<String, String>> expectedReduceKeyValues = Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz"));
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC, expectedReduceKeyValues);
-
-        final List<KeyValue<String, String>> expectedJoinKeyValues = Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3"));
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC, expectedJoinKeyValues);
-
-
-        final List<String> expectedCollectedProcessorValues = Arrays.asList("FOO", "BAR", "BAZ");
-
+        // Verify the values collected by the processor
         assertThat(3, equalTo(processorValueCollector.size()));
         assertThat(processorValueCollector, equalTo(expectedCollectedProcessorValues));
 
-        streams.close(ofSeconds(5));
+        // Verify the expected output
+        assertThat(countOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedCountKeyValues)));
+        assertThat(aggregationOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedAggKeyValues)));
+        assertThat(reduceOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedReduceKeyValues)));
+        assertThat(joinedOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedJoinKeyValues)));
     }
 
+    private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> keyValuePairs) {
+        final Map<K, V> map = new HashMap<>();
+        for (final KeyValue<K, V> pair : keyValuePairs) {
+            map.put(pair.key, pair.value);
+        }
+        return map;
+    }
 
     private int getCountOfRepartitionTopicsFound(final String topologyString) {
         final Matcher matcher = repartitionTopicPattern.matcher(topologyString);
@@ -224,7 +252,6 @@ public class RepartitionOptimizingIntegrationTest {
         return repartitionTopicsFound.size();
     }
 
-
     private List<KeyValue<String, String>> getKeyValues() {
         final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
         final String[] keys = new String[]{"a", "b", "c"};
@@ -237,7 +264,6 @@ public class RepartitionOptimizingIntegrationTest {
         return keyValueList;
     }
 
-
     private static class SimpleProcessor extends AbstractProcessor<String, String> {
 
         final List<String> valueList;
@@ -252,183 +278,183 @@ public class RepartitionOptimizingIntegrationTest {
         }
     }
 
-
     private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
                                                               + "   Sub-topology: 0\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n"
-                                                              + "      --> KSTREAM-MAP-0000000001\n"
-                                                              + "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n"
-                                                              + "      --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000000\n"
-                                                              + "    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n"
-                                                              + "      --> KSTREAM-MAPVALUES-0000000003\n"
-                                                              + "      <-- KSTREAM-MAP-0000000001\n"
-                                                              + "    Processor: KSTREAM-FILTER-0000000040 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000039\n"
-                                                              + "      <-- KSTREAM-MAP-0000000001\n"
-                                                              + "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n"
-                                                              + "      --> KSTREAM-PROCESSOR-0000000004\n"
-                                                              + "      <-- KSTREAM-FILTER-0000000002\n"
-                                                              + "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n"
-                                                              + "      --> none\n"
-                                                              + "      <-- KSTREAM-MAPVALUES-0000000003\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n"
-                                                              + "      <-- KSTREAM-FILTER-0000000040\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000036 (topics: [count-groupByKey-repartition])\n"
+                                                              + "      --> aggregate, count, join-filter, reduce-filter\n"
+                                                              + "    Processor: count (stores: [count-store])\n"
+                                                              + "      --> count-toStream\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000036\n"
+                                                              + "    Processor: count-toStream (stores: [])\n"
+                                                              + "      --> join-other-windowed, count-to\n"
+                                                              + "      <-- count\n"
+                                                              + "    Processor: join-filter (stores: [])\n"
+                                                              + "      --> join-this-windowed\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000036\n"
+                                                              + "    Processor: reduce-filter (stores: [])\n"
+                                                              + "      --> reduce-peek\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000036\n"
+                                                              + "    Processor: join-other-windowed (stores: [other-join-store])\n"
+                                                              + "      --> join-other-join\n"
+                                                              + "      <-- count-toStream\n"
+                                                              + "    Processor: join-this-windowed (stores: [join-store])\n"
+                                                              + "      --> join-this-join\n"
+                                                              + "      <-- join-filter\n"
+                                                              + "    Processor: reduce-peek (stores: [])\n"
+                                                              + "      --> reducer\n"
+                                                              + "      <-- reduce-filter\n"
+                                                              + "    Processor: aggregate (stores: [aggregate-store])\n"
+                                                              + "      --> aggregate-toStream\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000036\n"
+                                                              + "    Processor: join-other-join (stores: [join-store])\n"
+                                                              + "      --> join-merge\n"
+                                                              + "      <-- join-other-windowed\n"
+                                                              + "    Processor: join-this-join (stores: [other-join-store])\n"
+                                                              + "      --> join-merge\n"
+                                                              + "      <-- join-this-windowed\n"
+                                                              + "    Processor: reducer (stores: [reduce-store])\n"
+                                                              + "      --> reduce-toStream\n"
+                                                              + "      <-- reduce-peek\n"
+                                                              + "    Processor: aggregate-toStream (stores: [])\n"
+                                                              + "      --> reduce-to\n"
+                                                              + "      <-- aggregate\n"
+                                                              + "    Processor: join-merge (stores: [])\n"
+                                                              + "      --> join-to\n"
+                                                              + "      <-- join-this-join, join-other-join\n"
+                                                              + "    Processor: reduce-toStream (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000023\n"
+                                                              + "      <-- reducer\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
+                                                              + "      <-- reduce-toStream\n"
+                                                              + "    Sink: count-to (topic: outputTopic_0)\n"
+                                                              + "      <-- count-toStream\n"
+                                                              + "    Sink: join-to (topic: joinedOutputTopic)\n"
+                                                              + "      <-- join-merge\n"
+                                                              + "    Sink: reduce-to (topic: outputTopic_1)\n"
+                                                              + "      <-- aggregate-toStream\n"
                                                               + "\n"
                                                               + "  Sub-topology: 1\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n"
-                                                              + "      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n"
-                                                              + "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n"
-                                                              + "      --> KTABLE-TOSTREAM-0000000011\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000041\n"
-                                                              + "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n"
-                                                              + "      <-- KSTREAM-AGGREGATE-0000000007\n"
-                                                              + "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
-                                                              + "      --> KSTREAM-PEEK-0000000021\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000041\n"
-                                                              + "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n"
-                                                              + "      --> KSTREAM-WINDOWED-0000000033\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000041\n"
-                                                              + "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n"
-                                                              + "      --> KSTREAM-REDUCE-0000000023\n"
-                                                              + "      <-- KSTREAM-FILTER-0000000020\n"
-                                                              + "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n"
-                                                              + "      --> KSTREAM-JOINTHIS-0000000035\n"
-                                                              + "      <-- KSTREAM-FILTER-0000000029\n"
-                                                              + "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n"
-                                                              + "      --> KSTREAM-JOINOTHER-0000000036\n"
-                                                              + "      <-- KTABLE-TOSTREAM-0000000011\n"
-                                                              + "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n"
-                                                              + "      --> KTABLE-TOSTREAM-0000000018\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000041\n"
-                                                              + "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n"
-                                                              + "      --> KSTREAM-MERGE-0000000037\n"
-                                                              + "      <-- KSTREAM-WINDOWED-0000000034\n"
-                                                              + "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n"
-                                                              + "      --> KSTREAM-MERGE-0000000037\n"
-                                                              + "      <-- KSTREAM-WINDOWED-0000000033\n"
-                                                              + "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n"
-                                                              + "      --> KTABLE-TOSTREAM-0000000027\n"
-                                                              + "      <-- KSTREAM-PEEK-0000000021\n"
-                                                              + "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000038\n"
-                                                              + "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n"
-                                                              + "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000019\n"
-                                                              + "      <-- KSTREAM-AGGREGATE-0000000014\n"
-                                                              + "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000028\n"
-                                                              + "      <-- KSTREAM-REDUCE-0000000023\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n"
-                                                              + "      <-- KTABLE-TOSTREAM-0000000011\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
-                                                              + "      <-- KTABLE-TOSTREAM-0000000018\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n"
-                                                              + "      <-- KTABLE-TOSTREAM-0000000027\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n"
-                                                              + "      <-- KSTREAM-MERGE-0000000037\n\n";
+                                                              + "    Source: sourceStream (topics: [input])\n"
+                                                              + "      --> source-map\n"
+                                                              + "    Processor: source-map (stores: [])\n"
+                                                              + "      --> process-filter, KSTREAM-FILTER-0000000035\n"
+                                                              + "      <-- sourceStream\n"
+                                                              + "    Processor: process-filter (stores: [])\n"
+                                                              + "      --> process-mapValues\n"
+                                                              + "      <-- source-map\n"
+                                                              + "    Processor: KSTREAM-FILTER-0000000035 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000034\n"
+                                                              + "      <-- source-map\n"
+                                                              + "    Processor: process-mapValues (stores: [])\n"
+                                                              + "      --> process\n"
+                                                              + "      <-- process-filter\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000034 (topic: count-groupByKey-repartition)\n"
+                                                              + "      <-- KSTREAM-FILTER-0000000035\n"
+                                                              + "    Processor: process (stores: [])\n"
+                                                              + "      --> none\n"
+                                                              + "      <-- process-mapValues\n\n";
+
 
 
     private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
                                                                 + "   Sub-topology: 0\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n"
-                                                                + "      --> KSTREAM-MAP-0000000001\n"
-                                                                + "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n"
-                                                                + "      --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000000\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
-                                                                + "      --> KSTREAM-PEEK-0000000021\n"
-                                                                + "      <-- KSTREAM-MAP-0000000001\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n"
-                                                                + "      --> KSTREAM-MAPVALUES-0000000003\n"
-                                                                + "      <-- KSTREAM-MAP-0000000001\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n"
-                                                                + "      --> KSTREAM-FILTER-0000000031\n"
-                                                                + "      <-- KSTREAM-MAP-0000000001\n"
-                                                                + "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n"
-                                                                + "      --> KSTREAM-FILTER-0000000025\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000020\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000009 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000008\n"
-                                                                + "      <-- KSTREAM-MAP-0000000001\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000016 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000015\n"
-                                                                + "      <-- KSTREAM-MAP-0000000001\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000025 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000024\n"
-                                                                + "      <-- KSTREAM-PEEK-0000000021\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000031 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000030\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000029\n"
-                                                                + "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n"
-                                                                + "      --> KSTREAM-PROCESSOR-0000000004\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000002\n"
-                                                                + "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n"
-                                                                + "      --> none\n"
-                                                                + "      <-- KSTREAM-MAPVALUES-0000000003\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000009\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000016\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000024 (topic: KSTREAM-REDUCE-STATE-STORE-0000000022-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000025\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000030 (topic: KSTREAM-FILTER-0000000029-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000031\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000007 (topics: [count-groupByKey-repartition])\n"
+                                                                + "      --> count\n"
+                                                                + "    Processor: count (stores: [count-store])\n"
+                                                                + "      --> count-toStream\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000007\n"
+                                                                + "    Processor: count-toStream (stores: [])\n"
+                                                                + "      --> join-other-windowed, count-to\n"
+                                                                + "      <-- count\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000027 (topics: [join-left-repartition])\n"
+                                                                + "      --> join-this-windowed\n"
+                                                                + "    Processor: join-other-windowed (stores: [other-join-store])\n"
+                                                                + "      --> join-other-join\n"
+                                                                + "      <-- count-toStream\n"
+                                                                + "    Processor: join-this-windowed (stores: [join-store])\n"
+                                                                + "      --> join-this-join\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000027\n"
+                                                                + "    Processor: join-other-join (stores: [join-store])\n"
+                                                                + "      --> join-merge\n"
+                                                                + "      <-- join-other-windowed\n"
+                                                                + "    Processor: join-this-join (stores: [other-join-store])\n"
+                                                                + "      --> join-merge\n"
+                                                                + "      <-- join-this-windowed\n"
+                                                                + "    Processor: join-merge (stores: [])\n"
+                                                                + "      --> join-to\n"
+                                                                + "      <-- join-this-join, join-other-join\n"
+                                                                + "    Sink: count-to (topic: outputTopic_0)\n"
+                                                                + "      <-- count-toStream\n"
+                                                                + "    Sink: join-to (topic: joinedOutputTopic)\n"
+                                                                + "      <-- join-merge\n"
                                                                 + "\n"
                                                                 + "  Sub-topology: 1\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n"
-                                                                + "      --> KSTREAM-AGGREGATE-0000000007\n"
-                                                                + "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n"
-                                                                + "      --> KTABLE-TOSTREAM-0000000011\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000010\n"
-                                                                + "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n"
-                                                                + "      <-- KSTREAM-AGGREGATE-0000000007\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000032 (topics: [KSTREAM-FILTER-0000000029-repartition])\n"
-                                                                + "      --> KSTREAM-WINDOWED-0000000033\n"
-                                                                + "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n"
-                                                                + "      --> KSTREAM-JOINTHIS-0000000035\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000032\n"
-                                                                + "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n"
-                                                                + "      --> KSTREAM-JOINOTHER-0000000036\n"
-                                                                + "      <-- KTABLE-TOSTREAM-0000000011\n"
-                                                                + "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n"
-                                                                + "      --> KSTREAM-MERGE-0000000037\n"
-                                                                + "      <-- KSTREAM-WINDOWED-0000000034\n"
-                                                                + "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n"
-                                                                + "      --> KSTREAM-MERGE-0000000037\n"
-                                                                + "      <-- KSTREAM-WINDOWED-0000000033\n"
-                                                                + "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000038\n"
-                                                                + "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n"
-                                                                + "      <-- KTABLE-TOSTREAM-0000000011\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n"
-                                                                + "      <-- KSTREAM-MERGE-0000000037\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000013 (topics: [aggregate-groupByKey-repartition])\n"
+                                                                + "      --> aggregate\n"
+                                                                + "    Processor: aggregate (stores: [aggregate-store])\n"
+                                                                + "      --> aggregate-toStream\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000013\n"
+                                                                + "    Processor: aggregate-toStream (stores: [])\n"
+                                                                + "      --> reduce-to\n"
+                                                                + "      <-- aggregate\n"
+                                                                + "    Sink: reduce-to (topic: outputTopic_1)\n"
+                                                                + "      <-- aggregate-toStream\n"
                                                                 + "\n"
                                                                 + "  Sub-topology: 2\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition])\n"
-                                                                + "      --> KSTREAM-AGGREGATE-0000000014\n"
-                                                                + "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n"
-                                                                + "      --> KTABLE-TOSTREAM-0000000018\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000017\n"
-                                                                + "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000019\n"
-                                                                + "      <-- KSTREAM-AGGREGATE-0000000014\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
-                                                                + "      <-- KTABLE-TOSTREAM-0000000018\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000021 (topics: [reduce-groupByKey-repartition])\n"
+                                                                + "      --> reducer\n"
+                                                                + "    Processor: reducer (stores: [reduce-store])\n"
+                                                                + "      --> reduce-toStream\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000021\n"
+                                                                + "    Processor: reduce-toStream (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000023\n"
+                                                                + "      <-- reducer\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
+                                                                + "      <-- reduce-toStream\n"
                                                                 + "\n"
                                                                 + "  Sub-topology: 3\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000026 (topics: [KSTREAM-REDUCE-STATE-STORE-0000000022-repartition])\n"
-                                                                + "      --> KSTREAM-REDUCE-0000000023\n"
-                                                                + "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n"
-                                                                + "      --> KTABLE-TOSTREAM-0000000027\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000026\n"
-                                                                + "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000028\n"
-                                                                + "      <-- KSTREAM-REDUCE-0000000023\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n"
-                                                                + "      <-- KTABLE-TOSTREAM-0000000027\n\n";
+                                                                + "    Source: sourceStream (topics: [input])\n"
+                                                                + "      --> source-map\n"
+                                                                + "    Processor: source-map (stores: [])\n"
+                                                                + "      --> reduce-filter, process-filter, KSTREAM-FILTER-0000000006, join-filter, KSTREAM-FILTER-0000000012\n"
+                                                                + "      <-- sourceStream\n"
+                                                                + "    Processor: reduce-filter (stores: [])\n"
+                                                                + "      --> reduce-peek\n"
+                                                                + "      <-- source-map\n"
+                                                                + "    Processor: join-filter (stores: [])\n"
+                                                                + "      --> KSTREAM-FILTER-0000000026\n"
+                                                                + "      <-- source-map\n"
+                                                                + "    Processor: process-filter (stores: [])\n"
+                                                                + "      --> process-mapValues\n"
+                                                                + "      <-- source-map\n"
+                                                                + "    Processor: reduce-peek (stores: [])\n"
+                                                                + "      --> KSTREAM-FILTER-0000000020\n"
+                                                                + "      <-- reduce-filter\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000006 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000005\n"
+                                                                + "      <-- source-map\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000012 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000011\n"
+                                                                + "      <-- source-map\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000019\n"
+                                                                + "      <-- reduce-peek\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000026 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000025\n"
+                                                                + "      <-- join-filter\n"
+                                                                + "    Processor: process-mapValues (stores: [])\n"
+                                                                + "      --> process\n"
+                                                                + "      <-- process-filter\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000005 (topic: count-groupByKey-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000006\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000011 (topic: aggregate-groupByKey-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000012\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000019 (topic: reduce-groupByKey-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000020\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000025 (topic: join-left-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000026\n"
+                                                                + "    Processor: process (stores: [])\n"
+                                                                + "      --> none\n"
+                                                                + "      <-- process-mapValues\n\n";
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
similarity index 54%
rename from streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
index 473a626..0d081f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
@@ -15,33 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.streams.processor.internals;
 
-
-import java.time.Duration;
-import kafka.utils.MockTime;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
+
 import org.junit.After;
 import org.junit.Before;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,126 +51,142 @@ import java.util.List;
 import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
-@Category({IntegrationTest.class})
-public class RepartitionWithMergeOptimizingIntegrationTest {
+public class RepartitionWithMergeOptimizingTest {
+
+    private final Logger log = LoggerFactory.getLogger(RepartitionWithMergeOptimizingTest.class);
 
-    private static final int NUM_BROKERS = 1;
     private static final String INPUT_A_TOPIC = "inputA";
     private static final String INPUT_B_TOPIC = "inputB";
     private static final String COUNT_TOPIC = "outputTopic_0";
-    private static final String COUNT_STRING_TOPIC = "outputTopic_1";
-
+    private static final String STRING_COUNT_TOPIC = "outputTopic_1";
 
     private static final int ONE_REPARTITION_TOPIC = 1;
     private static final int TWO_REPARTITION_TOPICS = 2;
 
+    private final Serializer<String> stringSerializer = new StringSerializer();
+    private final Deserializer<String> stringDeserializer = new StringDeserializer();
+
     private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
 
     private Properties streamsConfiguration;
+    private TopologyTestDriver topologyTestDriver;
 
-
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final MockTime mockTime = CLUSTER.time;
+    private final List<KeyValue<String, Long>> expectedCountKeyValues =
+        Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L));
+    private final List<KeyValue<String, String>> expectedStringCountKeyValues =
+        Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6"));
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         final Properties props = new Properties();
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
             "maybe-optimized-with-merge-test-app",
-            CLUSTER.bootstrapServers(),
+            "dummy-bootstrap-servers-config",
             Serdes.String().getClass().getName(),
             Serdes.String().getClass().getName(),
             props);
-
-        CLUSTER.createTopics(COUNT_TOPIC,
-                             COUNT_STRING_TOPIC,
-                             INPUT_A_TOPIC,
-                             INPUT_B_TOPIC);
-
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
     @After
-    public void tearDown() throws Exception {
-        CLUSTER.deleteAllTopicsAndWait(30_000L);
+    public void tearDown() {
+        try {
+            topologyTestDriver.close();
+        } catch (final RuntimeException e) {
+            log.warn("The following exception was thrown while trying to close the TopologyTestDriver (note that " +
+                "KAFKA-6647 causes this when running on Windows):", e);
+        }
     }
 
     @Test
-    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
-        runIntegrationTest(StreamsConfig.OPTIMIZE,
-                           ONE_REPARTITION_TOPIC);
+    public void shouldSendCorrectRecords_OPTIMIZED() {
+        runTest(StreamsConfig.OPTIMIZE, ONE_REPARTITION_TOPIC);
     }
 
     @Test
-    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
-        runIntegrationTest(StreamsConfig.NO_OPTIMIZATION,
-                           TWO_REPARTITION_TOPICS);
+    public void shouldSendCorrectResults_NO_OPTIMIZATION() {
+        runTest(StreamsConfig.NO_OPTIMIZATION, TWO_REPARTITION_TOPICS);
     }
 
 
-    private void runIntegrationTest(final String optimizationConfig,
-                                    final int expectedNumberRepartitionTopics) throws Exception {
+    private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) {
 
+        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> sourceAStream = builder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> sourceAStream =
+            builder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceAStream"));
 
-        final KStream<String, String> sourceBStream = builder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> sourceBStream =
+            builder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceBStream"));
 
-        final KStream<String, String> mappedAStream = sourceAStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v));
-        final KStream<String, String> mappedBStream = sourceBStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v));
+        final KStream<String, String> mappedAStream =
+            sourceAStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v), Named.as("mappedAStream"));
+        final KStream<String, String> mappedBStream =
+            sourceBStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v), Named.as("mappedBStream"));
 
-        final KStream<String, String> mergedStream = mappedAStream.merge(mappedBStream);
+        final KStream<String, String> mergedStream = mappedAStream.merge(mappedBStream, Named.as("mergedStream"));
 
-        mergedStream.groupByKey().count().toStream().to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
-        mergedStream.groupByKey().count().toStream().mapValues(v -> v.toString()).to(COUNT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
+        mergedStream
+            .groupByKey(Grouped.as("long-groupByKey"))
+            .count(Named.as("long-count"), Materialized.as(Stores.inMemoryKeyValueStore("long-store")))
+            .toStream(Named.as("long-toStream"))
+            .to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()).withName("long-to"));
 
-        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+        mergedStream
+            .groupByKey(Grouped.as("string-groupByKey"))
+            .count(Named.as("string-count"), Materialized.as(Stores.inMemoryKeyValueStore("string-store")))
+            .toStream(Named.as("string-toStream"))
+            .mapValues(v -> v.toString(), Named.as("string-mapValues"))
+            .to(STRING_COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.String()).withName("string-to"));
 
-        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+        final Topology topology = builder.build(streamsConfiguration);
 
-        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_A_TOPIC, getKeyValues(), producerConfig, mockTime);
-        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_B_TOPIC, getKeyValues(), producerConfig, mockTime);
+        topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);
 
-        final Properties consumerConfig1 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
-        final Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+        final TestInputTopic<String, String> inputTopicA = topologyTestDriver.createInputTopic(INPUT_A_TOPIC, stringSerializer, stringSerializer);
+        final TestInputTopic<String, String> inputTopicB = topologyTestDriver.createInputTopic(INPUT_B_TOPIC, stringSerializer, stringSerializer);
+
+        final TestOutputTopic<String, Long> countOutputTopic = topologyTestDriver.createOutputTopic(COUNT_TOPIC, stringDeserializer, new LongDeserializer());
+        final TestOutputTopic<String, String> stringCountOutputTopic = topologyTestDriver.createOutputTopic(STRING_COUNT_TOPIC, stringDeserializer, stringDeserializer);
+
+        inputTopicA.pipeKeyValueList(getKeyValues());
+        inputTopicB.pipeKeyValueList(getKeyValues());
 
-        final Topology topology = builder.build(streamsConfiguration);
         final String topologyString = topology.describe().toString();
-        System.out.println(topologyString);
 
+        // Verify the topology
         if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) {
             assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString);
         } else {
             assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString);
         }
 
-
-        /*
-           confirming number of expected repartition topics here
-         */
+        // Verify the number of repartition topics
         assertEquals(expectedNumberRepartitionTopics, getCountOfRepartitionTopicsFound(topologyString));
 
-        final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
-        streams.start();
-
-        final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L));
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues);
-
-        final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6"));
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues);
-
-        streams.close(Duration.ofSeconds(5));
+        // Verify the expected output
+        assertThat(countOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedCountKeyValues)));
+        assertThat(stringCountOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedStringCountKeyValues)));
     }
 
+    private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> keyValuePairs) {
+        final Map<K, V> map = new HashMap<>();
+        for (final KeyValue<K, V> pair : keyValuePairs) {
+            map.put(pair.key, pair.value);
+        }
+        return map;
+    }
 
     private int getCountOfRepartitionTopicsFound(final String topologyString) {
         final Matcher matcher = repartitionTopicPattern.matcher(topologyString);
@@ -179,7 +197,6 @@ public class RepartitionWithMergeOptimizingIntegrationTest {
         return repartitionTopicsFound.size();
     }
 
-
     private List<KeyValue<String, String>> getKeyValues() {
         final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
         final String[] keys = new String[]{"X", "Y", "Z"};
@@ -192,104 +209,103 @@ public class RepartitionWithMergeOptimizingIntegrationTest {
         return keyValueList;
     }
 
-
-
     private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
                                                               + "   Sub-topology: 0\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n"
-                                                              + "      --> KSTREAM-MAP-0000000002\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n"
-                                                              + "      --> KSTREAM-MAP-0000000003\n"
-                                                              + "    Processor: KSTREAM-MAP-0000000002 (stores: [])\n"
-                                                              + "      --> KSTREAM-MERGE-0000000004\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000000\n"
-                                                              + "    Processor: KSTREAM-MAP-0000000003 (stores: [])\n"
-                                                              + "      --> KSTREAM-MERGE-0000000004\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000001\n"
-                                                              + "    Processor: KSTREAM-MERGE-0000000004 (stores: [])\n"
-                                                              + "      --> KSTREAM-FILTER-0000000021\n"
-                                                              + "      <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n"
-                                                              + "    Processor: KSTREAM-FILTER-0000000021 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000020\n"
-                                                              + "      <-- KSTREAM-MERGE-0000000004\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n"
-                                                              + "      <-- KSTREAM-FILTER-0000000021\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000020 (topics: [long-groupByKey-repartition])\n"
+                                                              + "      --> long-count, string-count\n"
+                                                              + "    Processor: string-count (stores: [string-store])\n"
+                                                              + "      --> string-toStream\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000020\n"
+                                                              + "    Processor: long-count (stores: [long-store])\n"
+                                                              + "      --> long-toStream\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000020\n"
+                                                              + "    Processor: string-toStream (stores: [])\n"
+                                                              + "      --> string-mapValues\n"
+                                                              + "      <-- string-count\n"
+                                                              + "    Processor: long-toStream (stores: [])\n"
+                                                              + "      --> long-to\n"
+                                                              + "      <-- long-count\n"
+                                                              + "    Processor: string-mapValues (stores: [])\n"
+                                                              + "      --> string-to\n"
+                                                              + "      <-- string-toStream\n"
+                                                              + "    Sink: long-to (topic: outputTopic_0)\n"
+                                                              + "      <-- long-toStream\n"
+                                                              + "    Sink: string-to (topic: outputTopic_1)\n"
+                                                              + "      <-- string-mapValues\n"
                                                               + "\n"
                                                               + "  Sub-topology: 1\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n"
-                                                              + "      --> KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n"
-                                                              + "    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n"
-                                                              + "      --> KTABLE-TOSTREAM-0000000017\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000022\n"
-                                                              + "    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n"
-                                                              + "      --> KTABLE-TOSTREAM-0000000010\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000022\n"
-                                                              + "    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n"
-                                                              + "      --> KSTREAM-MAPVALUES-0000000018\n"
-                                                              + "      <-- KSTREAM-AGGREGATE-0000000013\n"
-                                                              + "    Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000019\n"
-                                                              + "      <-- KTABLE-TOSTREAM-0000000017\n"
-                                                              + "    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000011\n"
-                                                              + "      <-- KSTREAM-AGGREGATE-0000000006\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n"
-                                                              + "      <-- KTABLE-TOSTREAM-0000000010\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
-                                                              + "      <-- KSTREAM-MAPVALUES-0000000018\n\n";
+                                                              + "    Source: sourceAStream (topics: [inputA])\n"
+                                                              + "      --> mappedAStream\n"
+                                                              + "    Source: sourceBStream (topics: [inputB])\n"
+                                                              + "      --> mappedBStream\n"
+                                                              + "    Processor: mappedAStream (stores: [])\n"
+                                                              + "      --> mergedStream\n"
+                                                              + "      <-- sourceAStream\n"
+                                                              + "    Processor: mappedBStream (stores: [])\n"
+                                                              + "      --> mergedStream\n"
+                                                              + "      <-- sourceBStream\n"
+                                                              + "    Processor: mergedStream (stores: [])\n"
+                                                              + "      --> KSTREAM-FILTER-0000000019\n"
+                                                              + "      <-- mappedAStream, mappedBStream\n"
+                                                              + "    Processor: KSTREAM-FILTER-0000000019 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000018\n"
+                                                              + "      <-- mergedStream\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000018 (topic: long-groupByKey-repartition)\n"
+                                                              + "      <-- KSTREAM-FILTER-0000000019\n\n";
 
 
     private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
                                                                 + "   Sub-topology: 0\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n"
-                                                                + "      --> KSTREAM-MAP-0000000002\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n"
-                                                                + "      --> KSTREAM-MAP-0000000003\n"
-                                                                + "    Processor: KSTREAM-MAP-0000000002 (stores: [])\n"
-                                                                + "      --> KSTREAM-MERGE-0000000004\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000000\n"
-                                                                + "    Processor: KSTREAM-MAP-0000000003 (stores: [])\n"
-                                                                + "      --> KSTREAM-MERGE-0000000004\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000001\n"
-                                                                + "    Processor: KSTREAM-MERGE-0000000004 (stores: [])\n"
-                                                                + "      --> KSTREAM-FILTER-0000000008, KSTREAM-FILTER-0000000015\n"
-                                                                + "      <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000008 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000007\n"
-                                                                + "      <-- KSTREAM-MERGE-0000000004\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000015 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000014\n"
-                                                                + "      <-- KSTREAM-MERGE-0000000004\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000007 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000008\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000014 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000015\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000008 (topics: [long-groupByKey-repartition])\n"
+                                                                + "      --> long-count\n"
+                                                                + "    Processor: long-count (stores: [long-store])\n"
+                                                                + "      --> long-toStream\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000008\n"
+                                                                + "    Processor: long-toStream (stores: [])\n"
+                                                                + "      --> long-to\n"
+                                                                + "      <-- long-count\n"
+                                                                + "    Sink: long-to (topic: outputTopic_0)\n"
+                                                                + "      <-- long-toStream\n"
                                                                 + "\n"
                                                                 + "  Sub-topology: 1\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000009 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n"
-                                                                + "      --> KSTREAM-AGGREGATE-0000000006\n"
-                                                                + "    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n"
-                                                                + "      --> KTABLE-TOSTREAM-0000000010\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000009\n"
-                                                                + "    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000011\n"
-                                                                + "      <-- KSTREAM-AGGREGATE-0000000006\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n"
-                                                                + "      <-- KTABLE-TOSTREAM-0000000010\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000014 (topics: [string-groupByKey-repartition])\n"
+                                                                + "      --> string-count\n"
+                                                                + "    Processor: string-count (stores: [string-store])\n"
+                                                                + "      --> string-toStream\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000014\n"
+                                                                + "    Processor: string-toStream (stores: [])\n"
+                                                                + "      --> string-mapValues\n"
+                                                                + "      <-- string-count\n"
+                                                                + "    Processor: string-mapValues (stores: [])\n"
+                                                                + "      --> string-to\n"
+                                                                + "      <-- string-toStream\n"
+                                                                + "    Sink: string-to (topic: outputTopic_1)\n"
+                                                                + "      <-- string-mapValues\n"
                                                                 + "\n"
                                                                 + "  Sub-topology: 2\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000016 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n"
-                                                                + "      --> KSTREAM-AGGREGATE-0000000013\n"
-                                                                + "    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n"
-                                                                + "      --> KTABLE-TOSTREAM-0000000017\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000016\n"
-                                                                + "    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n"
-                                                                + "      --> KSTREAM-MAPVALUES-0000000018\n"
-                                                                + "      <-- KSTREAM-AGGREGATE-0000000013\n"
-                                                                + "    Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000019\n"
-                                                                + "      <-- KTABLE-TOSTREAM-0000000017\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
-                                                                + "      <-- KSTREAM-MAPVALUES-0000000018\n\n";
+                                                                + "    Source: sourceAStream (topics: [inputA])\n"
+                                                                + "      --> mappedAStream\n"
+                                                                + "    Source: sourceBStream (topics: [inputB])\n"
+                                                                + "      --> mappedBStream\n"
+                                                                + "    Processor: mappedAStream (stores: [])\n"
+                                                                + "      --> mergedStream\n"
+                                                                + "      <-- sourceAStream\n"
+                                                                + "    Processor: mappedBStream (stores: [])\n"
+                                                                + "      --> mergedStream\n"
+                                                                + "      <-- sourceBStream\n"
+                                                                + "    Processor: mergedStream (stores: [])\n"
+                                                                + "      --> KSTREAM-FILTER-0000000007, KSTREAM-FILTER-0000000013\n"
+                                                                + "      <-- mappedAStream, mappedBStream\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000006\n"
+                                                                + "      <-- mergedStream\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000013 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000012\n"
+                                                                + "      <-- mergedStream\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000006 (topic: long-groupByKey-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000007\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000012 (topic: string-groupByKey-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000013\n\n";
+
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index 25c642e..4dd4954 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -104,27 +104,22 @@ public class StreamsBrokerDownResilienceTest {
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
 
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
+        streams.setUncaughtExceptionHandler( (t,e) -> {
                 System.err.println("FATAL: An unexpected exception " + e);
                 System.err.flush();
                 streams.close(Duration.ofSeconds(30));
             }
-        });
+        );
+
         System.out.println("Start Kafka Streams");
         streams.start();
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
+        Runtime.getRuntime().addShutdownHook(new Thread( () -> {
                 streams.close(Duration.ofSeconds(30));
                 System.out.println("Complete shutdown of streams resilience test app now");
                 System.out.flush();
             }
-        }));
-
-
+        ));
     }
 
     private static boolean confirmCorrectConfigs(final Properties properties) {
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index ab57090..00dbe35 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -441,7 +441,7 @@ class StreamsUpgradeTest(Test):
         if upgrade_from is None:  # upgrade disabled -- second round of rolling bounces
             roll_counter = ".1-"  # second round of rolling bounces
         else:
-            roll_counter = ".0-"  # first  round of rolling boundes
+            roll_counter = ".0-"  # first  round of rolling bounces
 
         node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
         node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)


Mime
View raw message