kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder
Date Mon, 31 Jul 2017 22:29:06 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 75b7910..719bab3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -49,6 +49,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
+@SuppressWarnings("deprecation")
 public class KStreamBuilderTest {
 
     private static final String APP_ID = "app-id";
@@ -92,7 +93,7 @@ public class KStreamBuilderTest {
 
 
     @Test
-    public void shouldNotTryProcessingFromSinkTopic() {
+    public void shouldProcessFromSinkTopic() {
         final KStream<String, String> source = builder.stream("topic-source");
         source.to("topic-sink");
 
@@ -110,7 +111,7 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldTryProcessingFromThoughTopic() {
+    public void shouldProcessViaThroughTopic() {
         final KStream<String, String> source = builder.stream("topic-source");
         final KStream<String, String> through = source.through("topic-sink");
 
@@ -310,13 +311,13 @@ public class KStreamBuilderTest {
         final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
         mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
         assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
-        assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
+        assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000000-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
     }
 
     @Test
     public void shouldAddTopicToEarliestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        
+
         builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName);
 
         assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index d482182..96e1373 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -28,12 +28,11 @@ import org.junit.After;
 import org.junit.Test;
 
 import java.util.Random;
+
 import static org.junit.Assert.assertTrue;
 
 public class AbstractStreamTest {
 
-    private final String topicName = "topic";
-
     private KStreamTestDriver driver;
 
     @After
@@ -45,9 +44,10 @@ public class AbstractStreamTest {
 
     @Test
     public void testShouldBeExtensible() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
         final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+        final String topicName = "topic";
 
         ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(Serdes.Integer(), Serdes.String(), topicName));
 
@@ -63,14 +63,14 @@ public class AbstractStreamTest {
 
     private class ExtendedKStream<K, V> extends AbstractStream<K> {
 
-        ExtendedKStream(KStream<K, V> stream) {
+        ExtendedKStream(final KStream<K, V> stream) {
             super((KStreamImpl<K, V>) stream);
         }
 
         KStream<K, V> randomFilter() {
-            String name = this.topology.newName("RANDOM-FILTER-");
-            this.topology.addProcessor(name, new ExtendedKStreamDummy(), this.name);
-            return new KStreamImpl<>(topology, name, sourceNodes, false);
+            String name = builder.newName("RANDOM-FILTER-");
+            builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), this.name);
+            return new KStreamImpl<>(builder, name, sourceNodes, false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 69b328d..9f6c023 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockValueJoiner;
@@ -38,7 +38,7 @@ import static org.junit.Assert.assertEquals;
 
 public class GlobalKTableJoinsTest {
 
-    private final KStreamBuilder builder = new KStreamBuilder();
+    private final StreamsBuilder builder = new StreamsBuilder();
     private final Map<String, String> results = new HashMap<>();
     private final String streamTopic = "stream";
     private final String globalTopic = "global";

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
new file mode 100644
index 0000000..a29d4af
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.MockValueJoiner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.streams.Topology.AutoOffsetReset;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class InternalStreamsBuilderTest {
+
+    private static final String APP_ID = "app-id";
+
+    private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+
+    private KStreamTestDriver driver = null;
+
+    public static InternalTopologyBuilder internalTopologyBuilder(final StreamsBuilder streamsBuilder) throws NoSuchFieldException, IllegalAccessException {
+        final Field internalStreamsBuilderField = streamsBuilder.getClass().getDeclaredField("internalStreamsBuilder");
+        internalStreamsBuilderField.setAccessible(true);
+        final InternalStreamsBuilder internalStreamsBuilder = (InternalStreamsBuilder) internalStreamsBuilderField.get(streamsBuilder);
+
+        return internalTopologyBuilder(internalStreamsBuilder);
+    }
+
+    public static InternalTopologyBuilder internalTopologyBuilder(final InternalStreamsBuilder internalStreamsBuilder) throws NoSuchFieldException, IllegalAccessException {
+        final Field internalTopologyBuilderField = internalStreamsBuilder.getClass().getDeclaredField("internalTopologyBuilder");
+        internalTopologyBuilderField.setAccessible(true);
+        return (InternalTopologyBuilder) internalTopologyBuilderField.get(internalStreamsBuilder);
+    }
+
+    @Before
+    public void setUp() {
+        builder.internalTopologyBuilder.setApplicationId(APP_ID);
+    }
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Test
+    public void testNewName() {
+        assertEquals("X-0000000000", builder.newName("X-"));
+        assertEquals("Y-0000000001", builder.newName("Y-"));
+        assertEquals("Z-0000000002", builder.newName("Z-"));
+
+        final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+
+        assertEquals("X-0000000000", newBuilder.newName("X-"));
+        assertEquals("Y-0000000001", newBuilder.newName("Y-"));
+        assertEquals("Z-0000000002", newBuilder.newName("Z-"));
+    }
+
+    @Test
+    public void testNewStoreName() {
+        assertEquals("X-STATE-STORE-0000000000", builder.newStoreName("X-"));
+        assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
+        assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
+
+        final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+
+        assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
+        assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));
+        assertEquals("Z-STATE-STORE-0000000002", newBuilder.newStoreName("Z-"));
+    }
+
+    @Test
+    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+        final String topic3 = "topic-3";
+        final KStream<String, String> source1 = builder.stream(null, null, null, null, topic1);
+        final KStream<String, String> source2 = builder.stream(null, null, null, null, topic2);
+        final KStream<String, String> source3 = builder.stream(null, null, null, null, topic3);
+        final KStream<String, String> processedSource1 =
+                source1.mapValues(new ValueMapper<String, String>() {
+                    @Override
+                    public String apply(final String value) {
+                        return value;
+                    }
+                }).filter(new Predicate<String, String>() {
+                    @Override
+                    public boolean test(final String key, final String value) {
+                        return true;
+                    }
+                });
+        final KStream<String, String> processedSource2 = source2.filter(new Predicate<String, String>() {
+            @Override
+            public boolean test(final String key, final String value) {
+                return true;
+            }
+        });
+
+        final KStream<String, String> merged = builder.merge(processedSource1, processedSource2, source3);
+        merged.groupByKey().count("my-table");
+        final Map<String, List<String>> actual = builder.internalTopologyBuilder.stateStoreNameToSourceTopics();
+        assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
+    }
+
+    @Test
+    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
+        KTable table1 = builder.table(null, null, null, null, "topic1", "table1");
+        KTable table2 = builder.table(null, null, null, null, "topic2", (String) null);
+
+        final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
+
+        assertEquals(2, topology.stateStores().size());
+        assertEquals("table1", topology.stateStores().get(0).name());
+
+        final String internalStoreName = topology.stateStores().get(1).name();
+        assertTrue(internalStoreName.contains(KTableImpl.STATE_STORE_NAME));
+        assertEquals(2, topology.storeToChangelogTopic().size());
+        assertEquals("topic1", topology.storeToChangelogTopic().get("table1"));
+        assertEquals("topic2", topology.storeToChangelogTopic().get(internalStoreName));
+        assertEquals(table1.queryableStoreName(), "table1");
+        assertNull(table2.queryableStoreName());
+    }
+
+    @Test
+    public void shouldBuildSimpleGlobalTableTopology() throws Exception {
+        builder.globalTable(null, null, null, "table", "globalTable");
+
+        final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
+        final List<StateStore> stateStores = topology.globalStateStores();
+
+        assertEquals(1, stateStores.size());
+        assertEquals("globalTable", stateStores.get(0).name());
+    }
+
+    private void doBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+        final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
+
+        final List<StateStore> stateStores = topology.globalStateStores();
+        final Set<String> sourceTopics = topology.sourceTopics();
+
+        assertEquals(Utils.mkSet("table", "table2"), sourceTopics);
+        assertEquals(2, stateStores.size());
+    }
+
+    @Test
+    public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+        builder.globalTable(null, null, null, "table", "globalTable");
+        builder.globalTable(null, null, null, "table2", "globalTable2");
+
+        doBuildGlobalTopologyWithAllGlobalTables();
+    }
+
+    @Test
+    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
+        builder.globalTable(null, null, null, "table", null);
+        builder.globalTable(null, null, null, "table2", null);
+
+        doBuildGlobalTopologyWithAllGlobalTables();
+    }
+
+    @Test
+    public void shouldAddGlobalTablesToEachGroup() throws Exception {
+        final String one = "globalTable";
+        final String two = "globalTable2";
+        final GlobalKTable<String, String> globalTable = builder.globalTable(null, null, null, "table", one);
+        final GlobalKTable<String, String> globalTable2 = builder.globalTable(null, null, null, "table2", two);
+
+        builder.table(null, null, null, null, "not-global", "not-global");
+
+        final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
+            @Override
+            public String apply(final String key, final String value) {
+                return value;
+            }
+        };
+
+        final KStream<String, String> stream = builder.stream(null, null, null, null, "t1");
+        stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
+        final KStream<String, String> stream2 = builder.stream(null, null, null, null, "t2");
+        stream2.leftJoin(globalTable2, kvMapper, MockValueJoiner.TOSTRING_JOINER);
+
+        final Map<Integer, Set<String>> nodeGroups = builder.internalTopologyBuilder.nodeGroups();
+        for (Integer groupId : nodeGroups.keySet()) {
+            final ProcessorTopology topology = builder.internalTopologyBuilder.build(groupId);
+            final List<StateStore> stateStores = topology.globalStateStores();
+            final Set<String> names = new HashSet<>();
+            for (StateStore stateStore : stateStores) {
+                names.add(stateStore.name());
+            }
+
+            assertEquals(2, stateStores.size());
+            assertTrue(names.contains(one));
+            assertTrue(names.contains(two));
+        }
+    }
+
+    @Test
+    public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
+        final KStream<String, String> playEvents = builder.stream(null, null, null, null, "events");
+
+        final KTable<String, String> table = builder.table(null, null, null, null, "table-topic", "table-store");
+        assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
+
+        final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+        mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
+        assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
+        assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count"));
+    }
+
+    @Test
+    public void shouldAddTopicToEarliestAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        
+        builder.stream(AutoOffsetReset.EARLIEST, null, null, null, topicName);
+
+        assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldAddTopicToLatestAutoOffsetResetList() {
+        final String topicName = "topic-1";
+
+        builder.stream(AutoOffsetReset.LATEST, null, null, null, topicName);
+
+        assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldAddTableToEarliestAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        final String storeName = "test-store";
+
+        builder.table(AutoOffsetReset.EARLIEST, null, null, null, topicName, storeName);
+
+        assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldAddTableToLatestAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        final String storeName = "test-store";
+
+        builder.table(AutoOffsetReset.LATEST, null, null, null, topicName, storeName);
+
+        assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldNotAddTableToOffsetResetLists() {
+        final String topicName = "topic-1";
+        final String storeName = "test-store";
+        final Serde<String> stringSerde = Serdes.String();
+
+        builder.table(null, null, stringSerde, stringSerde, topicName, storeName);
+
+        assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldNotAddRegexTopicsToOffsetResetLists() {
+        final Pattern topicPattern = Pattern.compile("topic-\\d");
+        final String topic = "topic-5";
+
+        builder.stream(null, null, null, null, topicPattern);
+
+        assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topic).matches());
+        assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topic).matches());
+
+    }
+
+    @Test
+    public void shouldAddRegexTopicToEarliestAutoOffsetResetList() {
+        final Pattern topicPattern = Pattern.compile("topic-\\d+");
+        final String topicTwo = "topic-500000";
+
+        builder.stream(AutoOffsetReset.EARLIEST, null, null, null,  topicPattern);
+
+        assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicTwo).matches());
+        assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicTwo).matches());
+    }
+
+    @Test
+    public void shouldAddRegexTopicToLatestAutoOffsetResetList() {
+        final Pattern topicPattern = Pattern.compile("topic-\\d+");
+        final String topicTwo = "topic-1000000";
+
+        builder.stream(AutoOffsetReset.LATEST, null, null, null, topicPattern);
+
+        assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicTwo).matches());
+        assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicTwo).matches());
+    }
+
+    @Test
+    public void shouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
+        builder.stream(null, null, null, null, "topic");
+        final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
+        assertNull(processorTopology.source("topic").getTimestampExtractor());
+    }
+
+    @Test
+    public void shouldUseProvidedTimestampExtractor() throws Exception {
+        builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
+        final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
+        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @Test
+    public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
+        builder.table(null, null, null, null, "topic", "store");
+        final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
+        assertNull(processorTopology.source("topic").getTimestampExtractor());
+    }
+
+    @Test
+    public void ktableShouldUseProvidedTimestampExtractor() throws Exception {
+        builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
+        final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
+        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index f21c7d3..5a7147f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -19,20 +19,20 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
@@ -60,7 +60,7 @@ public class KGroupedStreamImplTest {
 
     private static final String TOPIC = "topic";
     private static final String INVALID_STORE_NAME = "~foo bar~";
-    private final KStreamBuilder builder = new KStreamBuilder();
+    private final StreamsBuilder builder = new StreamsBuilder();
     private KGroupedStream<String, String> groupedStream;
     private KStreamTestDriver driver = null;
 
@@ -218,7 +218,7 @@ public class KGroupedStreamImplTest {
                 return aggOne + aggTwo;
             }
         }, SessionWindows.with(30), Serdes.Integer(), "session-store");
-        table.foreach(new ForeachAction<Windowed<String>, Integer>() {
+        table.toStream().foreach(new ForeachAction<Windowed<String>, Integer>() {
             @Override
             public void apply(final Windowed<String> key, final Integer value) {
                 results.put(key, value);
@@ -248,7 +248,7 @@ public class KGroupedStreamImplTest {
                 return aggOne + aggTwo;
             }
         }, SessionWindows.with(30), Serdes.Integer());
-        table.foreach(new ForeachAction<Windowed<String>, Integer>() {
+        table.toStream().foreach(new ForeachAction<Windowed<String>, Integer>() {
             @Override
             public void apply(final Windowed<String> key, final Integer value) {
                 results.put(key, value);
@@ -283,7 +283,7 @@ public class KGroupedStreamImplTest {
     public void shouldCountSessionWindows() throws Exception {
         final Map<Windowed<String>, Long> results = new HashMap<>();
         KTable table = groupedStream.count(SessionWindows.with(30), "session-store");
-        table.foreach(new ForeachAction<Windowed<String>, Long>() {
+        table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
             @Override
             public void apply(final Windowed<String> key, final Long value) {
                 results.put(key, value);
@@ -297,7 +297,7 @@ public class KGroupedStreamImplTest {
     public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
         final Map<Windowed<String>, Long> results = new HashMap<>();
         KTable table = groupedStream.count(SessionWindows.with(30));
-        table.foreach(new ForeachAction<Windowed<String>, Long>() {
+        table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
             @Override
             public void apply(final Windowed<String> key, final Long value) {
                 results.put(key, value);
@@ -338,7 +338,7 @@ public class KGroupedStreamImplTest {
                     }
                 }, SessionWindows.with(30),
                 "session-store");
-        table.foreach(new ForeachAction<Windowed<String>, String>() {
+        table.toStream().foreach(new ForeachAction<Windowed<String>, String>() {
             @Override
             public void apply(final Windowed<String> key, final String value) {
                 results.put(key, value);
@@ -358,7 +358,7 @@ public class KGroupedStreamImplTest {
                         return value1 + ":" + value2;
                     }
                 }, SessionWindows.with(30));
-        table.foreach(new ForeachAction<Windowed<String>, String>() {
+        table.toStream().foreach(new ForeachAction<Windowed<String>, String>() {
             @Override
             public void apply(final Windowed<String> key, final String value) {
                 results.put(key, value);
@@ -512,6 +512,7 @@ public class KGroupedStreamImplTest {
         groupedStream.count(
                 TimeWindows.of(500L),
                 "aggregate-by-key-windowed")
+                .toStream()
                 .foreach(new ForeachAction<Windowed<String>, Long>() {
                     @Override
                     public void apply(final Windowed<String> key, final Long value) {
@@ -527,6 +528,7 @@ public class KGroupedStreamImplTest {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
         groupedStream.count(
                 TimeWindows.of(500L))
+                .toStream()
                 .foreach(new ForeachAction<Windowed<String>, Long>() {
                     @Override
                     public void apply(final Windowed<String> key, final Long value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 1e49b22..5dce4c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -45,7 +45,7 @@ import static org.junit.Assert.assertNull;
 
 public class KGroupedTableImplTest {
 
-    private final KStreamBuilder builder = new KStreamBuilder();
+    private final StreamsBuilder builder = new StreamsBuilder();
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private KGroupedTable<String, String> groupedTable;
     private KStreamTestDriver driver = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index f9f09ea..b199c34 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -46,8 +46,7 @@ public class KStreamBranchTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testKStreamBranch() {
-        KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId("X");
+        final StreamsBuilder builder = new StreamsBuilder();
 
         Predicate<Integer, String> isEven = new Predicate<Integer, String>() {
             @Override
@@ -112,7 +111,7 @@ public class KStreamBranchTest {
         };
 
         @SuppressWarnings("unchecked")
-        final KStream<Integer, String>[] branches = new KStreamBuilder()
+        final KStream<Integer, String>[] branches = new StreamsBuilder()
             .<Integer, String>stream("empty")
             .branch(positive, negative);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 7faf6d5..199d8b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -50,7 +50,7 @@ public class KStreamFilterTest {
 
     @Test
     public void testFilter() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
         KStream<Integer, String> stream;
@@ -70,7 +70,7 @@ public class KStreamFilterTest {
 
     @Test
     public void testFilterNot() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
         KStream<Integer, String> stream;
@@ -97,7 +97,7 @@ public class KStreamFilterTest {
             }
         };
 
-        new KStreamBuilder()
+        new StreamsBuilder()
             .<Integer, String>stream("empty")
             .filter(numberKeyPredicate)
             .filterNot(numberKeyPredicate)

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 29ea685..72fb547 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -17,19 +17,19 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.After;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.ArrayList;
 
+import static org.junit.Assert.assertEquals;
+
 public class KStreamFlatMapTest {
 
     private String topicName = "topic";
@@ -46,7 +46,7 @@ public class KStreamFlatMapTest {
 
     @Test
     public void testFlatMap() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         KeyValueMapper<Number, Object, Iterable<KeyValue<String, String>>> mapper =
             new KeyValueMapper<Number, Object, Iterable<KeyValue<String, String>>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index a51c6a4..48fd219 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -45,7 +45,7 @@ public class KStreamFlatMapValuesTest {
 
     @Test
     public void testFlatMapValues() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         ValueMapper<Number, Iterable<String>> mapper =
             new ValueMapper<Number, Iterable<String>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index ab56ad9..b3a9eb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -18,17 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.junit.After;
 import org.junit.Test;
-import java.util.List;
-import java.util.Locale;
+
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
 
 import static org.junit.Assert.assertEquals;
 
@@ -76,7 +77,7 @@ public class KStreamForeachTest {
             };
 
         // When
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
         KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
         stream.foreach(action);
 
@@ -101,7 +102,7 @@ public class KStreamForeachTest {
             public void apply(Number key, Object value) {}
         };
 
-        new KStreamBuilder()
+        new StreamsBuilder()
             .<Integer, String>stream("emptyTopic")
             .foreach(consume);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index abe9924..afe116e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -19,19 +19,20 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
-import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -41,9 +42,9 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
 
 public class KStreamImplTest {
@@ -51,17 +52,17 @@ public class KStreamImplTest {
     final private Serde<String> stringSerde = Serdes.String();
     final private Serde<Integer> intSerde = Serdes.Integer();
     private KStream<String, String> testStream;
-    private KStreamBuilder builder;
+    private StreamsBuilder builder;
 
     @Before
     public void before() {
-        builder = new KStreamBuilder();
+        builder = new StreamsBuilder();
         testStream = builder.stream("source");
     }
 
     @Test
-    public void testNumProcesses() {
-        final KStreamBuilder builder = new KStreamBuilder();
+    public void testNumProcesses() throws Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
 
         KStream<String, String> source1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
 
@@ -153,19 +154,19 @@ public class KStreamImplTest {
             1 + // to
             2 + // through
             1, // process
-            builder.setApplicationId("X").build(null).processors().size());
+            InternalStreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size());
     }
 
     @Test
-    public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
-        final KStreamBuilder builder = new KStreamBuilder();
+    public void shouldUseRecordMetadataTimestampExtractorWithThrough() throws Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
         KStream<String, String> stream1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
         KStream<String, String> stream2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4");
 
         stream1.to("topic-5");
         stream2.through("topic-6");
 
-        ProcessorTopology processorTopology = builder.setApplicationId("X").build(null);
+        ProcessorTopology processorTopology = InternalStreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null);
         assertThat(processorTopology.source("topic-6").getTimestampExtractor(), instanceOf(FailOnInvalidTimestamp.class));
         assertEquals(processorTopology.source("topic-4").getTimestampExtractor(), null);
         assertEquals(processorTopology.source("topic-3").getTimestampExtractor(), null);
@@ -208,7 +209,7 @@ public class KStreamImplTest {
     
     @Test
     public void testToWithNullValueSerdeDoesntNPE() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> inputStream = builder.stream(stringSerde, stringSerde, "input");
         inputStream.to(stringSerde, null, "output");
     }
@@ -243,7 +244,7 @@ public class KStreamImplTest {
         testStream.writeAsText(null);
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
         testStream.writeAsText("\t    \t");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index aad2351..2a8b260 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -18,9 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorContext;
@@ -66,7 +66,7 @@ public class KStreamKStreamJoinTest {
 
     @Test
     public void testJoin() throws Exception {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -81,7 +81,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -164,7 +164,7 @@ public class KStreamKStreamJoinTest {
 
     @Test
     public void testOuterJoin() throws Exception {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -179,7 +179,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -264,7 +264,7 @@ public class KStreamKStreamJoinTest {
     public void testWindowing() throws Exception {
         long time = 0L;
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -280,7 +280,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -491,7 +491,7 @@ public class KStreamKStreamJoinTest {
     public void testAsymetricWindowingAfter() throws Exception {
         long time = 1000L;
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -507,7 +507,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).after(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -600,7 +600,7 @@ public class KStreamKStreamJoinTest {
     public void testAsymetricWindowingBefore() throws Exception {
         long time = 1000L;
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -616,7 +616,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).before(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index f9189ea..35ee44c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -18,9 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorContext;
@@ -67,7 +67,7 @@ public class KStreamKStreamLeftJoinTest {
 
     @Test
     public void testLeftJoin() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -83,7 +83,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        final Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -155,7 +155,7 @@ public class KStreamKStreamLeftJoinTest {
 
     @Test
     public void testWindowing() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -173,7 +173,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        final Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 662b6e2..666a82d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -64,7 +64,7 @@ public class KStreamKTableJoinTest {
 
     @Test
     public void testJoin() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -77,7 +77,7 @@ public class KStreamKTableJoinTest {
         table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        final Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 0f79002..2610c7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -64,7 +64,7 @@ public class KStreamKTableLeftJoinTest {
 
     @Test
     public void testJoin() throws Exception {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -77,7 +77,7 @@ public class KStreamKTableLeftJoinTest {
         table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index b55a60a..4c29218 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -48,7 +48,7 @@ public class KStreamMapTest {
 
     @Test
     public void testMap() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         KeyValueMapper<Integer, String, KeyValue<String, Integer>> mapper =
             new KeyValueMapper<Integer, String, KeyValue<String, Integer>>() {
@@ -89,7 +89,7 @@ public class KStreamMapTest {
             }
         };
 
-        new KStreamBuilder()
+        new StreamsBuilder()
             .<Integer, String>stream("numbers")
             .map(stringify)
             .to("strings");

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index acef6e4..9c3dfdd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -47,7 +47,7 @@ public class KStreamMapValuesTest {
 
     @Test
     public void testFlatMapValues() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         ValueMapper<CharSequence, Integer> mapper =
             new ValueMapper<CharSequence, Integer>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index df6f765..bf60a61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -19,11 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.KStreamTestDriver;
-
 import org.junit.After;
 import org.junit.Test;
 
@@ -49,7 +48,7 @@ public class KStreamPeekTest {
 
     @Test
     public void shouldObserveStreamElements() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
         final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
         stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
@@ -68,7 +67,7 @@ public class KStreamPeekTest {
 
     @Test
     public void shouldNotAllowNullAction() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
         try {
             stream.peek(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index c94b868..f548511 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -19,19 +19,18 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.PrintForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.KStreamTestDriver;
-
-import org.junit.Before;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.io.PrintWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -80,7 +79,7 @@ public class KStreamPrintTest {
         
         final String[] expectedResult = {"[test-stream]: 0, zero", "[test-stream]: 1, one", "[test-stream]: 2, two", "[test-stream]: 3, three"};
         
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
         stream.process(kStreamPrint);
         
@@ -113,7 +112,7 @@ public class KStreamPrintTest {
 
         final String[] expectedResult = {"[test-stream]: (0, zero)", "[test-stream]: (1, one)", "[test-stream]: (2, two)", "[test-stream]: (3, three)"};
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
         stream.process(kStreamPrint);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index fe6f98b..5d9f6c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -18,9 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -51,7 +51,7 @@ public class KStreamSelectKeyTest {
 
     @Test
     public void testSelectKey() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final Map<Number, String> keyMap = new HashMap<>();
         keyMap.put(1, "ONE");
@@ -96,7 +96,7 @@ public class KStreamSelectKeyTest {
             public void apply(Number key, Object value) {}
         };
 
-        new KStreamBuilder()
+        new StreamsBuilder()
             .<Integer, String>stream("empty")
             .foreach(consume);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 6db1d8e..c3573f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -18,9 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -49,7 +49,7 @@ public class KStreamTransformTest {
 
     @Test
     public void testTransform() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
             new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 0a03e6b..def79ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -18,9 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.processor.Processor;
@@ -51,7 +51,7 @@ public class KStreamTransformValuesTest {
 
     @Test
     public void testTransform() {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         ValueTransformerSupplier<Number, Integer> valueTransformerSupplier =
             new ValueTransformerSupplier<Number, Integer>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 97a1408..181e8cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -64,7 +64,7 @@ public class KStreamWindowAggregateTest {
 
     @Test
     public void testAggBasic() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         String topic1 = "topic1";
 
         KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
@@ -156,7 +156,7 @@ public class KStreamWindowAggregateTest {
 
     @Test
     public void testJoin() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         String topic1 = "topic1";
         String topic2 = "topic2";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 2c37230..f4ad346 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -20,11 +20,11 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
@@ -41,7 +41,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
@@ -74,7 +73,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testAggBasic() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
@@ -123,7 +122,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testAggCoalesced() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
@@ -152,7 +151,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testAggRepartition() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
@@ -212,7 +211,7 @@ public class KTableAggregateTest {
                 ), proc.processed);
     }
 
-    private void testCountHelper(final KStreamBuilder builder, final String input, final MockProcessorSupplier<String, Long> proc) throws IOException {
+    private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier<String, Long> proc) throws IOException {
         driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(input, "A", "green");
@@ -239,7 +238,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testCount() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
@@ -254,7 +253,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testCountWithInternalStore() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
@@ -269,7 +268,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testCountCoalesced() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
@@ -298,7 +297,7 @@ public class KTableAggregateTest {
     
     @Test
     public void testRemoveOldBeforeAddNew() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
@@ -355,7 +354,7 @@ public class KTableAggregateTest {
     public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() throws Exception {
         final String tableOne = "tableOne";
         final String tableTwo = "tableTwo";
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String reduceTopic = "TestDriver-reducer-store-repartition";
         final Map<String, Long> reduceResults = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index c6721f7..a53f61e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -57,7 +57,7 @@ public class KTableFilterTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    private void doTestKTable(final KStreamBuilder builder, final KTable<String, Integer> table2,
+    private void doTestKTable(final StreamsBuilder builder, final KTable<String, Integer> table2,
                               final KTable<String, Integer> table3, final String topic1) {
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
@@ -81,7 +81,7 @@ public class KTableFilterTest {
 
     @Test
     public void testKTable() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -105,7 +105,7 @@ public class KTableFilterTest {
 
     @Test
     public void testQueryableKTable() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -127,7 +127,7 @@ public class KTableFilterTest {
         doTestKTable(builder, table2, table3, topic1);
     }
 
-    private void doTestValueGetter(final KStreamBuilder builder,
+    private void doTestValueGetter(final StreamsBuilder builder,
                                    final KTableImpl<String, Integer, Integer> table2,
                                    final KTableImpl<String, Integer, Integer> table3,
                                    final String topic1) throws IOException {
@@ -189,7 +189,7 @@ public class KTableFilterTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -215,7 +215,7 @@ public class KTableFilterTest {
 
     @Test
     public void testQueryableValueGetter() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -239,15 +239,15 @@ public class KTableFilterTest {
         doTestValueGetter(builder, table2, table3, topic1);
     }
 
-    private void doTestNotSendingOldValue(final KStreamBuilder builder,
+    private void doTestNotSendingOldValue(final StreamsBuilder builder,
                                           final KTableImpl<String, Integer, Integer> table1,
                                           final KTableImpl<String, Integer, Integer> table2,
                                           final String topic1) throws IOException {
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
 
-        builder.addProcessor("proc1", proc1, table1.name);
-        builder.addProcessor("proc2", proc2, table2.name);
+        builder.build().addProcessor("proc1", proc1, table1.name);
+        builder.build().addProcessor("proc2", proc2, table2.name);
 
         driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
 
@@ -280,7 +280,7 @@ public class KTableFilterTest {
 
     @Test
     public void testNotSendingOldValue() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -299,7 +299,7 @@ public class KTableFilterTest {
 
     @Test
     public void testQueryableNotSendingOldValue() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -316,7 +316,7 @@ public class KTableFilterTest {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
-    private void doTestSendingOldValue(final KStreamBuilder builder,
+    private void doTestSendingOldValue(final StreamsBuilder builder,
                                        final KTableImpl<String, Integer, Integer> table1,
                                        final KTableImpl<String, Integer, Integer> table2,
                                        final String topic1) throws IOException {
@@ -325,8 +325,8 @@ public class KTableFilterTest {
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
 
-        builder.addProcessor("proc1", proc1, table1.name);
-        builder.addProcessor("proc2", proc2, table2.name);
+        builder.build().addProcessor("proc1", proc1, table1.name);
+        builder.build().addProcessor("proc2", proc2, table2.name);
 
         driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
 
@@ -358,7 +358,7 @@ public class KTableFilterTest {
 
     @Test
     public void testSendingOldValue() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -377,7 +377,7 @@ public class KTableFilterTest {
 
     @Test
     public void testQueryableSendingOldValue() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -394,15 +394,15 @@ public class KTableFilterTest {
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
 
-    private void doTestSkipNullOnMaterialization(final KStreamBuilder builder,
+    private void doTestSkipNullOnMaterialization(final StreamsBuilder builder,
                                                  final KTableImpl<String, String, String> table1,
                                                  final KTableImpl<String, String, String> table2,
                                                  final String topic1) throws IOException {
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
 
-        builder.addProcessor("proc1", proc1, table1.name);
-        builder.addProcessor("proc2", proc2, table2.name);
+        builder.build().addProcessor("proc1", proc1, table1.name);
+        builder.build().addProcessor("proc2", proc2, table2.name);
 
         driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
 
@@ -417,7 +417,7 @@ public class KTableFilterTest {
     @Test
     public void testSkipNullOnMaterialization() throws IOException {
         // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead.
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -438,7 +438,7 @@ public class KTableFilterTest {
     @Test
     public void testQueryableSkipNullOnMaterialization() throws IOException {
         // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead.
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -465,7 +465,7 @@ public class KTableFilterTest {
             }
         };
 
-        new KStreamBuilder()
+        new StreamsBuilder()
             .<Integer, String>table("empty", "emptyStore")
             .filter(numberKeyPredicate)
             .filterNot(numberKeyPredicate)


Mime
View raw message