kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/5] kafka git commit: KAFKA-3856 (KIP-120) step two: extract internal functions from public facing TopologyBuilder class
Date Mon, 24 Jul 2017 18:03:31 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index e526da4..467f8b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -81,7 +81,7 @@ public class KafkaStreamsTest {
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
 
-        StateListenerStub stateListener = new StateListenerStub();
+        final StateListenerStub stateListener = new StateListenerStub();
         streams.setStateListener(stateListener);
         Assert.assertEquals(streams.state(), KafkaStreams.State.CREATED);
         Assert.assertEquals(stateListener.numChanges, 0);
@@ -102,7 +102,7 @@ public class KafkaStreamsTest {
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
 
-        StateListenerStub stateListener = new StateListenerStub();
+        final StateListenerStub stateListener = new StateListenerStub();
         streams.setStateListener(stateListener);
         streams.close();
         Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
@@ -161,7 +161,7 @@ public class KafkaStreamsTest {
 
         final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
         globalThreadField.setAccessible(true);
-        GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
+        final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
         assertEquals(globalStreamThread, null);
     }
 
@@ -269,8 +269,7 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
         final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
-
+        new KafkaStreams(builder, props);
     }
 
     @Test
@@ -285,8 +284,7 @@ public class KafkaStreamsTest {
 
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
         final KStreamBuilder builder2 = new KStreamBuilder();
-        final KafkaStreams streams2 = new KafkaStreams(builder2, props);
-
+        new KafkaStreams(builder2, props);
     }
 
     @Test(expected = IllegalStateException.class)
@@ -337,7 +335,7 @@ public class KafkaStreamsTest {
                                 while (keepRunning.get()) {
                                     Thread.sleep(10);
                                 }
-                            } catch (InterruptedException e) {
+                            } catch (final InterruptedException e) {
                                 // no-op
                             }
                         }
@@ -415,29 +413,28 @@ public class KafkaStreamsTest {
     @Test
     public void testToString() {
         streams.start();
-        String streamString = streams.toString();
+        final String streamString = streams.toString();
         streams.close();
-        String appId = streamString.split("\\n")[1].split(":")[1].trim();
+        final String appId = streamString.split("\\n")[1].split(":")[1].trim();
         Assert.assertNotEquals("streamString should not be empty", "", streamString);
         Assert.assertNotNull("streamString should not be null", streamString);
         Assert.assertNotEquals("streamString contains non-empty appId", "", appId);
         Assert.assertNotNull("streamString contains non-null appId", appId);
     }
 
-
     public static class StateListenerStub implements KafkaStreams.StateListener {
-        public int numChanges = 0;
-        public KafkaStreams.State oldState;
-        public KafkaStreams.State newState;
+        int numChanges = 0;
+        KafkaStreams.State oldState;
+        KafkaStreams.State newState;
         public Map<KafkaStreams.State, Long> mapStates = new HashMap<>();
 
         @Override
         public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
-            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0;
-            this.numChanges++;
+            final long prevCount = mapStates.containsKey(newState) ? mapStates.get(newState) : 0;
+            numChanges++;
             this.oldState = oldState;
             this.newState = newState;
-            this.mapStates.put(newState, prevCount + 1);
+            mapStates.put(newState, prevCount + 1);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 0b5c5e9..ad70112 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -36,12 +36,13 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
-import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -152,9 +153,15 @@ public class RegexSourceIntegrationTest {
         final StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams);
         final StreamThread originalThread = streamThreads[0];
 
-        final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
+        final TestStreamThread testStreamThread = new TestStreamThread(
+            builder.internalTopologyBuilder,
+            streamsConfig,
             new DefaultKafkaClientSupplier(),
-            originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), Time.SYSTEM);
+            originalThread.applicationId,
+            originalThread.clientId,
+            originalThread.processId,
+            new Metrics(),
+            Time.SYSTEM);
 
         final TestCondition oneTopicAdded = new TestCondition() {
             @Override
@@ -206,9 +213,15 @@ public class RegexSourceIntegrationTest {
         final StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams);
         final StreamThread originalThread = streamThreads[0];
 
-        final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
+        final TestStreamThread testStreamThread = new TestStreamThread(
+            builder.internalTopologyBuilder,
+            streamsConfig,
             new DefaultKafkaClientSupplier(),
-            originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), Time.SYSTEM);
+            originalThread.applicationId,
+            originalThread.clientId,
+            originalThread.processId,
+            new Metrics(),
+            Time.SYSTEM);
 
         streamThreads[0] = testStreamThread;
 
@@ -347,9 +360,15 @@ public class RegexSourceIntegrationTest {
         final StreamThread[] leaderStreamThreads = (StreamThread[]) leaderStreamThreadsField.get(partitionedStreamsLeader);
         final StreamThread originalLeaderThread = leaderStreamThreads[0];
 
-        final TestStreamThread leaderTestStreamThread = new TestStreamThread(builderLeader, streamsConfig,
-                new DefaultKafkaClientSupplier(),
-                originalLeaderThread.applicationId, originalLeaderThread.clientId, originalLeaderThread.processId, new Metrics(), Time.SYSTEM);
+        final TestStreamThread leaderTestStreamThread = new TestStreamThread(
+            builderLeader.internalTopologyBuilder,
+            streamsConfig,
+            new DefaultKafkaClientSupplier(),
+            originalLeaderThread.applicationId,
+            originalLeaderThread.clientId,
+            originalLeaderThread.processId,
+            new Metrics(),
+            Time.SYSTEM);
 
         leaderStreamThreads[0] = leaderTestStreamThread;
 
@@ -367,9 +386,15 @@ public class RegexSourceIntegrationTest {
         final StreamThread[] followerStreamThreads = (StreamThread[]) followerStreamThreadsField.get(partitionedStreamsFollower);
         final StreamThread originalFollowerThread = followerStreamThreads[0];
 
-        final TestStreamThread followerTestStreamThread = new TestStreamThread(builderFollower, streamsConfig,
-                new DefaultKafkaClientSupplier(),
-                originalFollowerThread.applicationId, originalFollowerThread.clientId, originalFollowerThread.processId, new Metrics(), Time.SYSTEM);
+        final TestStreamThread followerTestStreamThread = new TestStreamThread(
+            builderFollower.internalTopologyBuilder,
+            streamsConfig,
+            new DefaultKafkaClientSupplier(),
+            originalFollowerThread.applicationId,
+            originalFollowerThread.clientId,
+            originalFollowerThread.processId,
+            new Metrics(),
+            Time.SYSTEM);
 
         followerStreamThreads[0] = followerTestStreamThread;
 
@@ -438,7 +463,7 @@ public class RegexSourceIntegrationTest {
     private class TestStreamThread extends StreamThread {
         public volatile List<String> assignedTopicPartitions = new ArrayList<>();
 
-        public TestStreamThread(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) {
+        public TestStreamThread(final InternalTopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) {
             super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                   0);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index bad193a..a7ddb7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -50,12 +50,12 @@ import java.util.regex.Pattern;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TopologyBuilderTest {
 
@@ -611,7 +611,7 @@ public class TopologyBuilderTest {
 
 
     @Test(expected = TopologyBuilderException.class)
-    public void shouldThroughOnUnassignedStateStoreAccess() {
+    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -631,7 +631,7 @@ public class TopologyBuilderTest {
                     goodNodeName)
                 .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
 
-            final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder);
+            final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
             driver.process("topic", null, null);
         } catch (final StreamsException e) {
             final Throwable cause = e.getCause();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
index 17c5640..a541eb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.streams.TopologyDescription;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
@@ -29,11 +31,11 @@ import java.util.regex.Pattern;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-// TODO (remove this comment) Test name ok, we just use TopologyBuilder for now in this test until Topology gets added
+// TODO (remove this comment) Test name ok, we just use InternalTopologyBuilder for now in this test until Topology gets added
 public class TopologyTest {
-    // TODO change from TopologyBuilder to Topology
-    private final TopologyBuilder topology = new TopologyBuilder();
-    private final TopologyDescription expectedDescription = new TopologyDescription();
+    // TODO change from InternalTopologyBuilder to Topology
+    private final InternalTopologyBuilder topology = new InternalTopologyBuilder();
+    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
 
     @Test
     public void shouldDescribeEmptyTopology() {
@@ -45,7 +47,7 @@ public class TopologyTest {
         final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
 
         expectedDescription.addSubtopology(
-            new TopologyDescription.Subtopology(0,
+            new InternalTopologyBuilder.Subtopology(0,
                 Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
@@ -56,7 +58,7 @@ public class TopologyTest {
         final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
 
         expectedDescription.addSubtopology(
-            new TopologyDescription.Subtopology(0,
+            new InternalTopologyBuilder.Subtopology(0,
                 Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
@@ -67,7 +69,7 @@ public class TopologyTest {
         final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
 
         expectedDescription.addSubtopology(
-            new TopologyDescription.Subtopology(0,
+            new InternalTopologyBuilder.Subtopology(0,
                 Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
@@ -77,17 +79,17 @@ public class TopologyTest {
     public void multipleSourcesShouldHaveDistinctSubtopologies() {
         final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
         expectedDescription.addSubtopology(
-            new TopologyDescription.Subtopology(0,
+            new InternalTopologyBuilder.Subtopology(0,
                 Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
 
         final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
         expectedDescription.addSubtopology(
-            new TopologyDescription.Subtopology(1,
+            new InternalTopologyBuilder.Subtopology(1,
                 Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
 
         final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
         expectedDescription.addSubtopology(
-            new TopologyDescription.Subtopology(2,
+            new InternalTopologyBuilder.Subtopology(2,
                 Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
@@ -101,7 +103,7 @@ public class TopologyTest {
         final Set<TopologyDescription.Node> allNodes = new HashSet<>();
         allNodes.add(expectedSourceNode);
         allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -116,7 +118,7 @@ public class TopologyTest {
         final Set<TopologyDescription.Node> allNodes = new HashSet<>();
         allNodes.add(expectedSourceNode);
         allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -132,7 +134,7 @@ public class TopologyTest {
         final Set<TopologyDescription.Node> allNodes = new HashSet<>();
         allNodes.add(expectedSourceNode);
         allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -147,7 +149,7 @@ public class TopologyTest {
         allNodes.add(expectedSourceNode);
         allNodes.add(expectedProcessorNode1);
         allNodes.add(expectedProcessorNode2);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -162,7 +164,7 @@ public class TopologyTest {
         allNodes.add(expectedSourceNode1);
         allNodes.add(expectedSourceNode2);
         allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -181,17 +183,17 @@ public class TopologyTest {
         final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
         allNodes1.add(expectedSourceNode1);
         allNodes1.add(expectedProcessorNode1);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes1));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
 
         final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
         allNodes2.add(expectedSourceNode2);
         allNodes2.add(expectedProcessorNode2);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(1, allNodes2));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
 
         final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
         allNodes3.add(expectedSourceNode3);
         allNodes3.add(expectedProcessorNode3);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(2, allNodes3));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -210,17 +212,17 @@ public class TopologyTest {
         final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
         allNodes1.add(expectedSourceNode1);
         allNodes1.add(expectedSinkNode1);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes1));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
 
         final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
         allNodes2.add(expectedSourceNode2);
         allNodes2.add(expectedSinkNode2);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(1, allNodes2));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
 
         final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
         allNodes3.add(expectedSourceNode3);
         allNodes3.add(expectedSinkNode3);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(2, allNodes3));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -251,7 +253,7 @@ public class TopologyTest {
         allNodes.add(expectedSourceNode3);
         allNodes.add(expectedProcessorNode3);
         allNodes.add(expectedSinkNode);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -281,7 +283,7 @@ public class TopologyTest {
         allNodes.add(expectedProcessorNode2);
         allNodes.add(expectedSourceNode3);
         allNodes.add(expectedProcessorNode3);
-        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
@@ -301,41 +303,41 @@ public class TopologyTest {
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final String... sourceTopic) {
-        topology.addSource(sourceName, sourceTopic);
+        topology.addSource(null, sourceName, null, null, null, sourceTopic);
         String allSourceTopics = sourceTopic[0];
         for (int i = 1; i < sourceTopic.length; ++i) {
             allSourceTopics += ", " + sourceTopic[i];
         }
-        return new TopologyDescription.Source(sourceName, allSourceTopics);
+        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
     }
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final Pattern sourcePattern) {
-        topology.addSource(sourceName, sourcePattern);
-        return new TopologyDescription.Source(sourceName, sourcePattern.toString());
+        topology.addSource(null, sourceName, null, null, null, sourcePattern);
+        return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
     }
 
     private TopologyDescription.Processor addProcessor(final String processorName,
-                                                       final TopologyDescription.AbstractNode... parents) {
+                                                       final TopologyDescription.Node... parents) {
         return addProcessorWithNewStore(processorName, new String[0], parents);
     }
 
     private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
                                                                    final String[] storeNames,
-                                                                   final TopologyDescription.AbstractNode... parents) {
+                                                                   final TopologyDescription.Node... parents) {
         return addProcessorWithStore(processorName, storeNames, true, parents);
     }
 
     private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
                                                                         final String[] storeNames,
-                                                                        final TopologyDescription.AbstractNode... parents) {
+                                                                        final TopologyDescription.Node... parents) {
         return addProcessorWithStore(processorName, storeNames, false, parents);
     }
 
     private TopologyDescription.Processor addProcessorWithStore(final String processorName,
                                                                 final String[] storeNames,
                                                                 final boolean newStores,
-                                                                final TopologyDescription.AbstractNode... parents) {
+                                                                final TopologyDescription.Node... parents) {
         final String[] parentNames = new String[parents.length];
         for (int i = 0; i < parents.length; ++i) {
             parentNames[i] = parents[i].name();
@@ -350,11 +352,11 @@ public class TopologyTest {
             topology.connectProcessorAndStateStores(processorName, storeNames);
         }
         final TopologyDescription.Processor expectedProcessorNode
-            = new TopologyDescription.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
+            = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
 
-        for (final TopologyDescription.AbstractNode parent : parents) {
-            parent.addSuccessor(expectedProcessorNode);
-            expectedProcessorNode.addPredecessor(parent);
+        for (final TopologyDescription.Node parent : parents) {
+            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode);
+            ((InternalTopologyBuilder.AbstractNode) expectedProcessorNode).addPredecessor(parent);
         }
 
         return expectedProcessorNode;
@@ -362,19 +364,19 @@ public class TopologyTest {
 
     private TopologyDescription.Sink addSink(final String sinkName,
                                              final String sinkTopic,
-                                             final TopologyDescription.AbstractNode... parents) {
+                                             final TopologyDescription.Node... parents) {
         final String[] parentNames = new String[parents.length];
         for (int i = 0; i < parents.length; ++i) {
             parentNames[i] = parents[i].name();
         }
 
-        topology.addSink(sinkName, sinkTopic, parentNames);
+        topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
         final TopologyDescription.Sink expectedSinkNode
-            = new TopologyDescription.Sink(sinkName, sinkTopic);
+            = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
 
-        for (final TopologyDescription.AbstractNode parent : parents) {
-            parent.addSuccessor(expectedSinkNode);
-            expectedSinkNode.addPredecessor(parent);
+        for (final TopologyDescription.Node parent : parents) {
+            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode);
+            ((InternalTopologyBuilder.AbstractNode) expectedSinkNode).addPredecessor(parent);
         }
 
         return expectedSinkNode;
@@ -389,11 +391,12 @@ public class TopologyTest {
             sourceName,
             null,
             null,
+            null,
             globalTopicName,
             processorName,
             new MockProcessorSupplier());
 
-        final TopologyDescription.GlobalStore expectedGlobalStore = new TopologyDescription.GlobalStore(
+        final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
             sourceName,
             processorName,
             globalStoreName,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
new file mode 100644
index 0000000..b98b756
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.utils.Utils.mkList;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class InternalTopologyBuilderTest {
+
+    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+    private final Serde<String> stringSerde = Serdes.String();
+
+    @Test
+    public void shouldAddSourceWithOffsetReset() {
+        final String earliestTopic = "earliestTopic";
+        final String latestTopic = "latestTopic";
+
+        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, earliestTopic);
+        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic);
+
+        assertTrue(builder.earliestResetTopicsPattern().matcher(earliestTopic).matches());
+        assertTrue(builder.latestResetTopicsPattern().matcher(latestTopic).matches());
+    }
+
+    @Test
+    public void shouldAddSourcePatternWithOffsetReset() {
+        final String earliestTopicPattern = "earliest.*Topic";
+        final String latestTopicPattern = "latest.*Topic";
+
+        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile(earliestTopicPattern));
+        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null,  Pattern.compile(latestTopicPattern));
+
+        assertTrue(builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
+        assertTrue(builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
+    }
+
+    @Test
+    public void shouldAddSourceWithoutOffsetReset() {
+        final Pattern expectedPattern = Pattern.compile("test-topic");
+
+        builder.addSource(null, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "test-topic");
+
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+        assertEquals(builder.earliestResetTopicsPattern().pattern(), "");
+        assertEquals(builder.latestResetTopicsPattern().pattern(), "");
+    }
+
+    @Test
+    public void shouldAddPatternSourceWithoutOffsetReset() {
+        final Pattern expectedPattern = Pattern.compile("test-.*");
+        
+        builder.addSource(null, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), Pattern.compile("test-.*"));
+
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+        assertEquals(builder.earliestResetTopicsPattern().pattern(), "");
+        assertEquals(builder.latestResetTopicsPattern().pattern(), "");
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldNotAllowOffsetResetSourceWithoutTopics() {
+        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
+    }
+
+    @Test
+    public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
+        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
+        try {
+            builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
+            fail("Should throw TopologyBuilderException for duplicate source name");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test
+    public void testAddSourceWithSameName() {
+        builder.addSource(null, "source", null, null, null, "topic-1");
+        try {
+            builder.addSource(null, "source", null, null, null, "topic-2");
+            fail("Should throw TopologyBuilderException with source name conflict");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test
+    public void testAddSourceWithSameTopic() {
+        builder.addSource(null, "source", null, null, null, "topic-1");
+        try {
+            builder.addSource(null, "source-2", null, null, null, "topic-1");
+            fail("Should throw TopologyBuilderException with topic conflict");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test
+    public void testAddProcessorWithSameName() {
+        builder.addSource(null, "source", null, null, null, "topic-1");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        try {
+            builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+            fail("Should throw TopologyBuilderException with processor name conflict");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void testAddProcessorWithWrongParent() {
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void testAddProcessorWithSelfParent() {
+        builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
+    }
+
+    @Test
+    public void testAddSinkWithSameName() {
+        builder.addSource(null, "source", null, null, null, "topic-1");
+        builder.addSink("sink", "topic-2", null, null, null, "source");
+        try {
+            builder.addSink("sink", "topic-3", null, null, null, "source");
+            fail("Should throw TopologyBuilderException with sink name conflict");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void testAddSinkWithWrongParent() {
+        builder.addSink("sink", "topic-2", null, null, null, "source");
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void testAddSinkWithSelfParent() {
+        builder.addSink("sink", "topic-2", null, null, null, "sink");
+    }
+
+    @Test
+    public void testAddSinkConnectedWithParent() {
+        builder.addSource(null, "source", null, null, null, "source-topic");
+        builder.addSink("sink", "dest-topic", null, null, null, "source");
+
+        final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
+        final Set<String> nodeGroup = nodeGroups.get(0);
+
+        assertTrue(nodeGroup.contains("sink"));
+        assertTrue(nodeGroup.contains("source"));
+    }
+
+    @Test
+    public void testAddSinkConnectedWithMultipleParent() {
+        builder.addSource(null, "source", null, null, null, "source-topic");
+        builder.addSource(null, "sourceII", null, null, null, "source-topicII");
+        builder.addSink("sink", "dest-topic", null, null, null, "source", "sourceII");
+
+        final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
+        final Set<String> nodeGroup = nodeGroups.get(0);
+
+        assertTrue(nodeGroup.contains("sink"));
+        assertTrue(nodeGroup.contains("source"));
+        assertTrue(nodeGroup.contains("sourceII"));
+    }
+
+    @Test
+    public void testSourceTopics() {
+        builder.setApplicationId("X");
+        builder.addSource(null, "source-1", null, null, null, "topic-1");
+        builder.addSource(null, "source-2", null, null, null, "topic-2");
+        builder.addSource(null, "source-3", null, null, null, "topic-3");
+        builder.addInternalTopic("topic-3");
+
+        final Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2");
+
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+    }
+
+    @Test
+    public void testPatternSourceTopic() {
+        final Pattern expectedPattern = Pattern.compile("topic-\\d");
+        builder.addSource(null, "source-1", null, null, null, expectedPattern);
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+    }
+
+    @Test
+    public void testAddMoreThanOnePatternSourceNode() {
+        final Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
+        builder.addSource(null, "source-1", null, null, null, Pattern.compile("topics[A-Z]"));
+        builder.addSource(null, "source-2", null, null, null, Pattern.compile(".*-\\d"));
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+    }
+
+    @Test
+    public void testSubscribeTopicNameAndPattern() {
+        final Pattern expectedPattern = Pattern.compile("topic-bar|topic-foo|.*-\\d");
+        builder.addSource(null, "source-1", null, null, null, "topic-foo", "topic-bar");
+        builder.addSource(null, "source-2", null, null, null, Pattern.compile(".*-\\d"));
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+    }
+
+    @Test
+    public void testPatternMatchesAlreadyProvidedTopicSource() {
+        builder.addSource(null, "source-1", null, null, null, "foo");
+        try {
+            builder.addSource(null, "source-2", null, null, null, Pattern.compile("f.*"));
+            fail("Should throw TopologyBuilderException with topic name/pattern conflict");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test
+    public void testNamedTopicMatchesAlreadyProvidedPattern() {
+        builder.addSource(null, "source-1", null, null, null, Pattern.compile("f.*"));
+        try {
+            builder.addSource(null, "source-2", null, null, null, "foo");
+            fail("Should throw TopologyBuilderException with topic name/pattern conflict");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void testAddStateStoreWithNonExistingProcessor() {
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+    }
+
+    @Test
+    public void testAddStateStoreWithSource() {
+        builder.addSource(null, "source-1", null, null, null, "topic-1");
+        try {
+            builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+            fail("Should throw TopologyBuilderException with store cannot be added to source");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test
+    public void testAddStateStoreWithSink() {
+        builder.addSink("sink-1", "topic-1", null, null, null);
+        try {
+            builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+            fail("Should throw TopologyBuilderException with store cannot be added to sink");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test
+    public void testAddStateStoreWithDuplicates() {
+        builder.addStateStore(new MockStateStoreSupplier("store", false));
+        try {
+            builder.addStateStore(new MockStateStoreSupplier("store", false));
+            fail("Should throw TopologyBuilderException with store name conflict");
+        } catch (final TopologyBuilderException expected) { /* ok */ }
+    }
+
+    @Test
+    public void testAddStateStore() {
+        final StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
+        builder.addStateStore(supplier);
+        builder.setApplicationId("X");
+        builder.addSource(null, "source-1", null, null, null, "topic-1");
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+
+        assertEquals(0, builder.build(null).stateStores().size());
+
+        builder.connectProcessorAndStateStores("processor-1", "store-1");
+
+        final List<StateStore> suppliers = builder.build(null).stateStores();
+        assertEquals(1, suppliers.size());
+        assertEquals(supplier.name(), suppliers.get(0).name());
+    }
+
+    @Test
+    public void testTopicGroups() {
+        builder.setApplicationId("X");
+        builder.addInternalTopic("topic-1x");
+        builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x");
+        builder.addSource(null, "source-2", null, null, null, "topic-2");
+        builder.addSource(null, "source-3", null, null, null, "topic-3");
+        builder.addSource(null, "source-4", null, null, null, "topic-4");
+        builder.addSource(null, "source-5", null, null, null, "topic-5");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
+        builder.copartitionSources(mkList("source-1", "source-2"));
+
+        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
+
+        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+
+        final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
+        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+
+        assertEquals(3, topicGroups.size());
+        assertEquals(expectedTopicGroups, topicGroups);
+
+        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+        assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
+    }
+
+    @Test
+    public void testTopicGroupsByStateStore() {
+        builder.setApplicationId("X");
+        builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x");
+        builder.addSource(null, "source-2", null, null, null, "topic-2");
+        builder.addSource(null, "source-3", null, null, null, "topic-3");
+        builder.addSource(null, "source-4", null, null, null, "topic-4");
+        builder.addSource(null, "source-5", null, null, null, "topic-5");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
+        builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2");
+
+        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
+        builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
+        builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4");
+
+        builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
+        final StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
+        builder.addStateStore(supplier);
+        builder.connectProcessorAndStateStores("processor-5", "store-3");
+
+        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+
+        final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
+        final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
+        final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
+        final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
+        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(
+            Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
+            Collections.<String, InternalTopicConfig>emptyMap(),
+            Collections.singletonMap(
+                store1,
+                new InternalTopicConfig(
+                    store1,
+                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                    Collections.<String, String>emptyMap()))));
+        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(
+            Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
+            Collections.<String, InternalTopicConfig>emptyMap(),
+            Collections.singletonMap(
+                store2,
+                new InternalTopicConfig(
+                    store2,
+                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                    Collections.<String, String>emptyMap()))));
+        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(
+            Collections.<String>emptySet(), mkSet("topic-5"),
+            Collections.<String, InternalTopicConfig>emptyMap(),
+            Collections.singletonMap(store3,
+                new InternalTopicConfig(
+                    store3,
+                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                    Collections.<String, String>emptyMap()))));
+
+        assertEquals(3, topicGroups.size());
+        assertEquals(expectedTopicGroups, topicGroups);
+    }
+
+    @Test
+    public void testBuild() {
+        builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x");
+        builder.addSource(null, "source-2", null, null, null, "topic-2");
+        builder.addSource(null, "source-3", null, null, null, "topic-3");
+        builder.addSource(null, "source-4", null, null, null, "topic-4");
+        builder.addSource(null, "source-5", null, null, null, "topic-5");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
+        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
+
+        builder.setApplicationId("X");
+        final ProcessorTopology topology0 = builder.build(0);
+        final ProcessorTopology topology1 = builder.build(1);
+        final ProcessorTopology topology2 = builder.build(2);
+
+        assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
+        assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
+        assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
+        builder.addSink(null, "topic", null, null, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
+        builder.addSink("name", null, null, null, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
+        builder.addProcessor(null, new ProcessorSupplier() {
+            @Override
+            public Processor get() {
+                return null;
+            }
+        });
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessorSupplier() throws Exception {
+        builder.addProcessor("name", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
+        builder.addSource(null, null, null, null, null, Pattern.compile(".*"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception {
+        builder.connectProcessorAndStateStores(null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAddNullInternalTopic() throws Exception {
+        builder.addInternalTopic(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotSetApplicationIdToNull() throws Exception {
+        builder.setApplicationId(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAddNullStateStoreSupplier() throws Exception {
+        builder.addStateStore(null);
+    }
+
+    private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {
+        final Set<String> nodeNames = new HashSet<>();
+        for (final ProcessorNode node : nodes) {
+            nodeNames.add(node.name());
+        }
+        return nodeNames;
+    }
+
+    @Test
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
+        final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
+    @Test
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
+        final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
+    @Test
+    public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
+        builder.setApplicationId("appId");
+        builder.addInternalTopic("internal-topic");
+        builder.addSource(null, "source", null, null, null, "internal-topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
+        final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws Exception {
+        builder.setApplicationId("appId");
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor");
+        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
+        final Properties properties = topicConfig.toProperties(0);
+        final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(","));
+        assertEquals("appId-store-changelog", topicConfig.name());
+        assertTrue(policies.contains("compact"));
+        assertTrue(policies.contains("delete"));
+        assertEquals(2, policies.size());
+        assertEquals("30000", properties.getProperty(InternalTopicManager.RETENTION_MS));
+        assertEquals(2, properties.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception {
+        builder.setApplicationId("appId");
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("name", true), "processor");
+        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog");
+        final Properties properties = topicConfig.toProperties(0);
+        assertEquals("appId-name-changelog", topicConfig.name());
+        assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+        assertEquals(1, properties.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception {
+        builder.setApplicationId("appId");
+        builder.addInternalTopic("foo");
+        builder.addSource(null, "source", null, null, null, "foo");
+        final TopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
+        final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
+        final Properties properties = topicConfig.toProperties(0);
+        assertEquals("appId-foo", topicConfig.name());
+        assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+        assertEquals(1, properties.size());
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldThroughOnUnassignedStateStoreAccess() {
+        final String sourceNodeName = "source";
+        final String goodNodeName = "goodGuy";
+        final String badNodeName = "badGuy";
+
+        final Properties config = new Properties();
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+
+        try {
+            builder.addSource(null, sourceNodeName, null, null, null, "topic");
+            builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+            builder.addStateStore(
+                    Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+                    goodNodeName);
+            builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+
+            final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder);
+            driver.process("topic", null, null);
+        } catch (final StreamsException e) {
+            final Throwable cause = e.getCause();
+            if (cause != null
+                && cause instanceof TopologyBuilderException
+                && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
+                throw (TopologyBuilderException) cause;
+            } else {
+                throw new RuntimeException("Did expect different exception. Did catch:", e);
+            }
+        }
+    }
+
+    private static class LocalMockProcessorSupplier implements ProcessorSupplier {
+        final static String STORE_NAME = "store";
+
+        @Override
+        public Processor get() {
+            return new Processor() {
+                @Override
+                public void init(final ProcessorContext context) {
+                    context.getStateStore(STORE_NAME);
+                }
+
+                @Override
+                public void process(final Object key, final Object value) { }
+
+                @Override
+                public void punctuate(final long timestamp) { }
+
+                @Override
+                public void close() {
+                }
+            };
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
+        builder.addSource(null, "source-1", null, null, null, "topic-foo");
+        builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-[A-C]"));
+        builder.addSource(null, "source-3", null, null, null, Pattern.compile("topic-\\d"));
+
+        final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+        updatedTopicsField.setAccessible(true);
+
+        final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+
+        updatedTopics.add("topic-B");
+        updatedTopics.add("topic-3");
+        updatedTopics.add("topic-A");
+
+        builder.updateSubscriptions(subscriptionUpdates, null);
+        builder.setApplicationId("test-id");
+
+        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
+        assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
+        assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
+        assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorPerSource() throws Exception {
+        builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
+        final Pattern pattern = Pattern.compile("t.*");
+        builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern);
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @Test
+    public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
+        builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+"));
+        builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest");
+        builder.addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
+
+        final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+        updatedTopicsField.setAccessible(true);
+
+        final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+
+        updatedTopics.add("topic-2");
+        updatedTopics.add("topic-3");
+        updatedTopics.add("topic-A");
+
+        builder.updateSubscriptions(subscriptionUpdates, "test-thread");
+        builder.setApplicationId("test-app");
+
+        final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics();
+        final List<String> topics = stateStoreAndTopics.get("testStateStore");
+
+        assertTrue("Expected to contain two topics", topics.size() == 2);
+
+        assertTrue(topics.contains("topic-2"));
+        assertTrue(topics.contains("topic-3"));
+        assertFalse(topics.contains("topic-A"));
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
+        final String sameNameForSourceAndProcessor = "sameName";
+        builder.addGlobalStore(
+            new MockStateStoreSupplier("anyName", false, false),
+            sameNameForSourceAndProcessor,
+            null,
+            null,
+            null,
+            "anyTopicName",
+            sameNameForSourceAndProcessor,
+            new MockProcessorSupplier());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 369c47f..fd3afa8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -121,9 +121,9 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingSimpleTopology() {
+    public void testDrivingSimpleTopology() throws Exception {
         int partition = 10;
-        driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition));
+        driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
@@ -143,8 +143,8 @@ public class ProcessorTopologyTest {
 
 
     @Test
-    public void testDrivingMultiplexingTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology());
+    public void testDrivingMultiplexingTopology() throws Exception {
+        driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -165,8 +165,8 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingMultiplexByNameTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology());
+    public void testDrivingMultiplexByNameTopology() throws Exception {
+        driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -187,9 +187,9 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingStatefulTopology() {
+    public void testDrivingStatefulTopology() throws Exception {
         String storeName = "entries";
-        driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName));
+        driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName).internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -213,7 +213,7 @@ public class ProcessorTopologyTest {
         final TopologyBuilder topologyBuilder = this.builder
                 .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
 
-        driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
+        driver = new ProcessorTopologyTestDriver(config, topologyBuilder.internalTopologyBuilder);
         final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
         driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -222,9 +222,9 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingSimpleMultiSourceTopology() {
+    public void testDrivingSimpleMultiSourceTopology() throws Exception {
         int partition = 10;
-        driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition));
+        driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition).internalTopologyBuilder);
 
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -236,8 +236,8 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingForwardToSourceTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology());
+    public void testDrivingForwardToSourceTopology() throws Exception {
+        driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -247,8 +247,8 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingInternalRepartitioningTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology());
+    public void testDrivingInternalRepartitioningTopology() throws Exception {
+        driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -258,8 +258,8 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology());
+    public void testDrivingInternalRepartitioningForwardingTimestampTopology() throws Exception {
+        driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key3", "value3@3000", STRING_SERIALIZER, STRING_SERIALIZER);


Mime
View raw message