kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder
Date Fri, 28 Jul 2017 23:46:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c50c941af -> 1844bf2b2


http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
new file mode 100644
index 0000000..c163935
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+public class TopologyTest {
+
+    private final Topology topology = new Topology();
+    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSourceWithTopic() {
+        topology.addSource((String) null, "topic");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSourceWithPattern() {
+        topology.addSource(null, Pattern.compile(".*"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicsWhenAddingSoureWithTopic() {
+        topology.addSource("source", (String[]) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicsWhenAddingSourceWithPattern() {
+        topology.addSource("source", (Pattern) null);
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldNotAllowZeroTopicsWhenAddingSource() {
+        topology.addSource("source");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingProcessor() {
+        topology.addProcessor(null, new ProcessorSupplier() {
+            @Override
+            public Processor get() {
+                return new MockProcessorSupplier().get();
+            }
+        });
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
+        topology.addProcessor("name", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSink() {
+        topology.addSink(null, "topic");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicWhenAddingSink() {
+        topology.addSink("name", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
+        topology.connectProcessorAndStateStores(null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreNameWhenConnectingProcessorAndStateStores() {
+        topology.connectProcessorAndStateStores("processor", (String[]) null);
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldNotAllowZeroStoreNameWhenConnectingProcessorAndStateStores() {
+        topology.connectProcessorAndStateStores("processor");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAddNullStateStoreSupplier() {
+        topology.addStateStore(null);
+    }
+
+    @Test
+    public void shouldNotAllowToAddSourcesWithSameName() {
+        topology.addSource("source", "topic-1");
+        try {
+            topology.addSource("source", "topic-2");
+            fail("Should throw TopologyException for duplicate source name");
+        } catch (TopologyException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowToAddTopicTwice() {
+        topology.addSource("source", "topic-1");
+        try {
+            topology.addSource("source-2", "topic-1");
+            fail("Should throw TopologyException for already used topic");
+        } catch (TopologyException expected) { }
+    }
+
+    @Test
+    public void testPatternMatchesAlreadyProvidedTopicSource() {
+        topology.addSource("source-1", "foo");
+        try {
+            topology.addSource("source-2", Pattern.compile("f.*"));
+            fail("Should have thrown TopologyException for overlapping pattern with already registered topic");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void testNamedTopicMatchesAlreadyProvidedPattern() {
+        topology.addSource("source-1", Pattern.compile("f.*"));
+        try {
+            topology.addSource("source-2", "foo");
+            fail("Should have thrown TopologyException for overlapping topic with already registered pattern");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void shoudNotAllowToAddProcessorWithSameName() {
+        topology.addSource("source", "topic-1");
+        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+        try {
+            topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+            fail("Should throw TopologyException for duplicate processor name");
+        } catch (TopologyException expected) { }
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldFailOnUnknownSource() {
+        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldFailIfNodeIsItsOwnParent() {
+        topology.addProcessor("processor", new MockProcessorSupplier(), "processor");
+    }
+
+    @Test
+    public void shouldNotAllowToAddSinkWithSameName() {
+        topology.addSource("source", "topic-1");
+        topology.addSink("sink", "topic-2", "source");
+        try {
+            topology.addSink("sink", "topic-3", "source");
+            fail("Should throw TopologyException for duplicate sink name");
+        } catch (TopologyException expected) { }
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldFailWithUnknownParent() {
+        topology.addSink("sink", "topic-2", "source");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldFailIfSinkIsItsOwnParent() {
+        topology.addSink("sink", "topic-2", "sink");
+    }
+
+    @Test
+    public void shouldFailIfSinkIsParent() {
+        topology.addSource("source", "topic-1");
+        topology.addSink("sink-1", "topic-2", "source");
+        try {
+            topology.addSink("sink-2", "topic-3", "sink-1");
+            fail("Should throw TopologyException for using sink as parent");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
+        topology.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+    }
+
+    @Test
+    public void shouldNotAllowToAddStateStoreToSource() {
+        topology.addSource("source-1", "topic-1");
+        try {
+            topology.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+            fail("Should have thrown TopologyException for adding store to source node");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowToAddStateStoreToSink() {
+        topology.addSink("sink-1", "topic-1");
+        try {
+            topology.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+            fail("Should have thrown TopologyException for adding store to sink node");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowToAddStoreWithSameName() {
+        topology.addStateStore(new MockStateStoreSupplier("store", false));
+        try {
+            topology.addStateStore(new MockStateStoreSupplier("store", false));
+            fail("Should have thrown TopologyException for duplicate store name");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+        final String sourceNodeName = "source";
+        final String goodNodeName = "goodGuy";
+        final String badNodeName = "badGuy";
+
+        final Properties config = new Properties();
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+
+        topology
+            .addSource(sourceNodeName, "topic")
+            .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+            .addStateStore(
+                Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+                goodNodeName)
+            .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+
+        try {
+            new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
+        } catch (final StreamsException e) {
+            final Throwable cause = e.getCause();
+            if (cause != null
+                && cause instanceof TopologyBuilderException
+                && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
+                throw (TopologyBuilderException) cause;
+            } else {
+                throw new RuntimeException("Did expect different exception. Did catch:", e);
+            }
+        }
+    }
+
+    private static class LocalMockProcessorSupplier implements ProcessorSupplier {
+        final static String STORE_NAME = "store";
+
+        @Override
+        public Processor get() {
+            return new Processor() {
+                @Override
+                public void init(ProcessorContext context) {
+                    context.getStateStore(STORE_NAME);
+                }
+
+                @Override
+                public void process(Object key, Object value) { }
+
+                @Override
+                public void punctuate(long timestamp) { }
+
+                @Override
+                public void close() { }
+            };
+        }
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
+        topology.addGlobalStore(
+            new MockStateStoreSupplier("anyName", false, false),
+            "sameName",
+            null,
+            null,
+            "anyTopicName",
+            "sameName",
+            new MockProcessorSupplier());
+    }
+
+    @Test
+    public void shouldDescribeEmptyTopology() {
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void singleSourceShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
+
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void singleSourcePatternShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
+
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(1,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(2,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void sourceAndProcessorShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final String[] store = new String[] {"store"};
+        final TopologyDescription.Processor expectedProcessorNode
+            = addProcessorWithNewStore("processor", store, expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+
+    @Test
+    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final String[] stores = new String[] {"store1", "store2"};
+        final TopologyDescription.Processor expectedProcessorNode
+            = addProcessorWithNewStore("processor", stores, expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode);
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedProcessorNode2);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic0");
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", Pattern.compile("topic[1-9]"));
+        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode1, expectedSourceNode2);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+        allNodes1.add(expectedSourceNode1);
+        allNodes1.add(expectedProcessorNode1);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
+
+        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+        allNodes2.add(expectedSourceNode2);
+        allNodes2.add(expectedProcessorNode2);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
+
+        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+        allNodes3.add(expectedSourceNode3);
+        allNodes3.add(expectedProcessorNode3);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        final TopologyDescription.Sink expectedSinkNode1 = addSink("sink1", "sinkTopic1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Sink expectedSinkNode2 = addSink("sink2", "sinkTopic2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Sink expectedSinkNode3 = addSink("sink3", "sinkTopic3", expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+        allNodes1.add(expectedSourceNode1);
+        allNodes1.add(expectedSinkNode1);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
+
+        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+        allNodes2.add(expectedSourceNode2);
+        allNodes2.add(expectedSinkNode2);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
+
+        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+        allNodes3.add(expectedSourceNode3);
+        allNodes3.add(expectedSinkNode3);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void processorsWithSameSinkShouldHaveSameSubtopology() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+        final TopologyDescription.Sink expectedSinkNode = addSink(
+            "sink",
+            "sinkTopic",
+            expectedProcessorNode1,
+            expectedProcessorNode2,
+            expectedProcessorNode3);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode2);
+        allNodes.add(expectedSourceNode3);
+        allNodes.add(expectedProcessorNode3);
+        allNodes.add(expectedSinkNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void processorsWithSharedStateShouldHaveSameSubtopology() {
+        final String[] store1 = new String[] {"store1"};
+        final String[] store2 = new String[] {"store2"};
+        final String[] bothStores = new String[] {store1[0], store2[0]};
+
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1
+            = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2
+            = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3
+            = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode2);
+        allNodes.add(expectedSourceNode3);
+        allNodes.add(expectedProcessorNode3);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void shouldDescribeGlobalStoreTopology() {
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor");
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void shouldDescribeMultipleGlobalStoreTopology() {
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1");
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2");
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    private TopologyDescription.Source addSource(final String sourceName,
+                                                 final String... sourceTopic) {
+        topology.addSource(null, sourceName, null, null, null, sourceTopic);
+        String allSourceTopics = sourceTopic[0];
+        for (int i = 1; i < sourceTopic.length; ++i) {
+            allSourceTopics += ", " + sourceTopic[i];
+        }
+        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
+    }
+
+    private TopologyDescription.Source addSource(final String sourceName,
+                                                 final Pattern sourcePattern) {
+        topology.addSource(null, sourceName, null, null, null, sourcePattern);
+        return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
+    }
+
+    private TopologyDescription.Processor addProcessor(final String processorName,
+                                                       final TopologyDescription.Node... parents) {
+        return addProcessorWithNewStore(processorName, new String[0], parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
+                                                                   final String[] storeNames,
+                                                                   final TopologyDescription.Node... parents) {
+        return addProcessorWithStore(processorName, storeNames, true, parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
+                                                                        final String[] storeNames,
+                                                                        final TopologyDescription.Node... parents) {
+        return addProcessorWithStore(processorName, storeNames, false, parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithStore(final String processorName,
+                                                                final String[] storeNames,
+                                                                final boolean newStores,
+                                                                final TopologyDescription.Node... parents) {
+        final String[] parentNames = new String[parents.length];
+        for (int i = 0; i < parents.length; ++i) {
+            parentNames[i] = parents[i].name();
+        }
+
+        topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
+        if (newStores) {
+            for (final String store : storeNames) {
+                topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
+            }
+        } else {
+            topology.connectProcessorAndStateStores(processorName, storeNames);
+        }
+        final TopologyDescription.Processor expectedProcessorNode
+            = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
+
+        for (final TopologyDescription.Node parent : parents) {
+            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode);
+            ((InternalTopologyBuilder.AbstractNode) expectedProcessorNode).addPredecessor(parent);
+        }
+
+        return expectedProcessorNode;
+    }
+
+    private TopologyDescription.Sink addSink(final String sinkName,
+                                             final String sinkTopic,
+                                             final TopologyDescription.Node... parents) {
+        final String[] parentNames = new String[parents.length];
+        for (int i = 0; i < parents.length; ++i) {
+            parentNames[i] = parents[i].name();
+        }
+
+        topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
+        final TopologyDescription.Sink expectedSinkNode
+            = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
+
+        for (final TopologyDescription.Node parent : parents) {
+            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode);
+            ((InternalTopologyBuilder.AbstractNode) expectedSinkNode).addPredecessor(parent);
+        }
+
+        return expectedSinkNode;
+    }
+
+    private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
+                                                                final String sourceName,
+                                                                final String globalTopicName,
+                                                                final String processorName) {
+        topology.addGlobalStore(
+            new MockStateStoreSupplier(globalStoreName, false, false),
+            sourceName,
+            null,
+            null,
+            null,
+            globalTopicName,
+            processorName,
+            new MockProcessorSupplier());
+
+        final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
+            sourceName,
+            processorName,
+            globalStoreName,
+            globalTopicName);
+
+        expectedDescription.addGlobalStore(expectedGlobalStore);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index a868839..3fecfc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
@@ -56,6 +57,7 @@ import java.util.regex.Pattern;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 @Category({IntegrationTest.class})
 public class KStreamsFineGrainedAutoResetIntegrationTest {
@@ -238,26 +240,29 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
         consumer.close();
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThrowExceptionOverlappingPattern() throws  Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         //NOTE this would realistically get caught when building topology, the test is for completeness
         builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
         builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
-        builder.stream(TOPIC_Y_1, TOPIC_Z_1);
 
-        builder.earliestResetTopicsPattern();
+        try {
+            builder.earliestResetTopicsPattern();
+            fail("Should have thrown TopologyException");
+        } catch (final TopologyException expected) { }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThrowExceptionOverlappingTopic() throws  Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         //NOTE this would realistically get caught when building topology, the test is for completeness
         builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
-        builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1"));
-        builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
 
-        builder.latestResetTopicsPattern();
+        try {
+            builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
+            fail("Should have thrown TopologyBuilderException");
+        } catch (final TopologyBuilderException expected) { }
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index a7ddb7b..0cd2f2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -57,9 +57,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class TopologyBuilderTest {
 
-
     @Test
     public void shouldAddSourceWithOffsetReset() {
         final TopologyBuilder builder = new TopologyBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
deleted file mode 100644
index a541eb3..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.streams.TopologyDescription;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-// TODO (remove this comment) Test name ok, we just use InternalTopologyBuilder for now in this test until Topology gets added
-public class TopologyTest {
-    // TODO change from InternalTopologyBuilder to Topology
-    private final InternalTopologyBuilder topology = new InternalTopologyBuilder();
-    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
-
-    @Test
-    public void shouldDescribeEmptyTopology() {
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void singleSourceShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
-
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void singleSourcePatternShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
-
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void multipleSourcesShouldHaveDistinctSubtopologies() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(1,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(2,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void sourceAndProcessorShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode);
-        allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-        final String[] store = new String[] {"store"};
-        final TopologyDescription.Processor expectedProcessorNode
-            = addProcessorWithNewStore("processor", store, expectedSourceNode);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode);
-        allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-
-    @Test
-    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-        final String[] stores = new String[] {"store1", "store2"};
-        final TopologyDescription.Processor expectedProcessorNode
-            = addProcessorWithNewStore("processor", stores, expectedSourceNode);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode);
-        allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode);
-        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode);
-        allNodes.add(expectedProcessorNode1);
-        allNodes.add(expectedProcessorNode2);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic0");
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", Pattern.compile("topic[1-9]"));
-        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode1, expectedSourceNode2);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode1);
-        allNodes.add(expectedSourceNode2);
-        allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
-        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
-
-        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
-        allNodes1.add(expectedSourceNode1);
-        allNodes1.add(expectedProcessorNode1);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
-
-        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
-        allNodes2.add(expectedSourceNode2);
-        allNodes2.add(expectedProcessorNode2);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
-
-        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
-        allNodes3.add(expectedSourceNode3);
-        allNodes3.add(expectedProcessorNode3);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
-        final TopologyDescription.Sink expectedSinkNode1 = addSink("sink1", "sinkTopic1", expectedSourceNode1);
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Sink expectedSinkNode2 = addSink("sink2", "sinkTopic2", expectedSourceNode2);
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Sink expectedSinkNode3 = addSink("sink3", "sinkTopic3", expectedSourceNode3);
-
-        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
-        allNodes1.add(expectedSourceNode1);
-        allNodes1.add(expectedSinkNode1);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
-
-        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
-        allNodes2.add(expectedSourceNode2);
-        allNodes2.add(expectedSinkNode2);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
-
-        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
-        allNodes3.add(expectedSourceNode3);
-        allNodes3.add(expectedSinkNode3);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void processorsWithSameSinkShouldHaveSameSubtopology() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
-
-        final TopologyDescription.Sink expectedSinkNode = addSink(
-            "sink",
-            "sinkTopic",
-            expectedProcessorNode1,
-            expectedProcessorNode2,
-            expectedProcessorNode3);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode1);
-        allNodes.add(expectedProcessorNode1);
-        allNodes.add(expectedSourceNode2);
-        allNodes.add(expectedProcessorNode2);
-        allNodes.add(expectedSourceNode3);
-        allNodes.add(expectedProcessorNode3);
-        allNodes.add(expectedSinkNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void processorsWithSharedStateShouldHaveSameSubtopology() {
-        final String[] store1 = new String[] {"store1"};
-        final String[] store2 = new String[] {"store2"};
-        final String[] bothStores = new String[] {store1[0], store2[0]};
-
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode1
-            = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Processor expectedProcessorNode2
-            = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Processor expectedProcessorNode3
-            = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode1);
-        allNodes.add(expectedProcessorNode1);
-        allNodes.add(expectedSourceNode2);
-        allNodes.add(expectedProcessorNode2);
-        allNodes.add(expectedSourceNode3);
-        allNodes.add(expectedProcessorNode3);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void shouldDescribeGlobalStoreTopology() {
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor");
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void shouldDescribeMultipleGlobalStoreTopology() {
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1");
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2");
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    private TopologyDescription.Source addSource(final String sourceName,
-                                                 final String... sourceTopic) {
-        topology.addSource(null, sourceName, null, null, null, sourceTopic);
-        String allSourceTopics = sourceTopic[0];
-        for (int i = 1; i < sourceTopic.length; ++i) {
-            allSourceTopics += ", " + sourceTopic[i];
-        }
-        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
-    }
-
-    private TopologyDescription.Source addSource(final String sourceName,
-                                                 final Pattern sourcePattern) {
-        topology.addSource(null, sourceName, null, null, null, sourcePattern);
-        return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
-    }
-
-    private TopologyDescription.Processor addProcessor(final String processorName,
-                                                       final TopologyDescription.Node... parents) {
-        return addProcessorWithNewStore(processorName, new String[0], parents);
-    }
-
-    private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
-                                                                   final String[] storeNames,
-                                                                   final TopologyDescription.Node... parents) {
-        return addProcessorWithStore(processorName, storeNames, true, parents);
-    }
-
-    private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
-                                                                        final String[] storeNames,
-                                                                        final TopologyDescription.Node... parents) {
-        return addProcessorWithStore(processorName, storeNames, false, parents);
-    }
-
-    private TopologyDescription.Processor addProcessorWithStore(final String processorName,
-                                                                final String[] storeNames,
-                                                                final boolean newStores,
-                                                                final TopologyDescription.Node... parents) {
-        final String[] parentNames = new String[parents.length];
-        for (int i = 0; i < parents.length; ++i) {
-            parentNames[i] = parents[i].name();
-        }
-
-        topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
-        if (newStores) {
-            for (final String store : storeNames) {
-                topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
-            }
-        } else {
-            topology.connectProcessorAndStateStores(processorName, storeNames);
-        }
-        final TopologyDescription.Processor expectedProcessorNode
-            = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
-
-        for (final TopologyDescription.Node parent : parents) {
-            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode);
-            ((InternalTopologyBuilder.AbstractNode) expectedProcessorNode).addPredecessor(parent);
-        }
-
-        return expectedProcessorNode;
-    }
-
-    private TopologyDescription.Sink addSink(final String sinkName,
-                                             final String sinkTopic,
-                                             final TopologyDescription.Node... parents) {
-        final String[] parentNames = new String[parents.length];
-        for (int i = 0; i < parents.length; ++i) {
-            parentNames[i] = parents[i].name();
-        }
-
-        topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
-        final TopologyDescription.Sink expectedSinkNode
-            = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
-
-        for (final TopologyDescription.Node parent : parents) {
-            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode);
-            ((InternalTopologyBuilder.AbstractNode) expectedSinkNode).addPredecessor(parent);
-        }
-
-        return expectedSinkNode;
-    }
-
-    private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
-                                                                final String sourceName,
-                                                                final String globalTopicName,
-                                                                final String processorName) {
-        topology.addGlobalStore(
-            new MockStateStoreSupplier(globalStoreName, false, false),
-            sourceName,
-            null,
-            null,
-            null,
-            globalTopicName,
-            processorName,
-            new MockProcessorSupplier());
-
-        final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
-            sourceName,
-            processorName,
-            globalStoreName,
-            globalTopicName);
-
-        expectedDescription.addGlobalStore(expectedGlobalStore);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b98b756..9bd8756 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,14 +19,15 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
@@ -67,8 +68,8 @@ public class InternalTopologyBuilderTest {
         final String earliestTopic = "earliestTopic";
         final String latestTopic = "latestTopic";
 
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, earliestTopic);
-        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic);
+        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, null, null, earliestTopic);
+        builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic);
 
         assertTrue(builder.earliestResetTopicsPattern().matcher(earliestTopic).matches());
         assertTrue(builder.latestResetTopicsPattern().matcher(latestTopic).matches());
@@ -79,8 +80,8 @@ public class InternalTopologyBuilderTest {
         final String earliestTopicPattern = "earliest.*Topic";
         final String latestTopicPattern = "latest.*Topic";
 
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile(earliestTopicPattern));
-        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null,  Pattern.compile(latestTopicPattern));
+        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile(earliestTopicPattern));
+        builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null,  Pattern.compile(latestTopicPattern));
 
         assertTrue(builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
         assertTrue(builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
@@ -108,18 +109,18 @@ public class InternalTopologyBuilderTest {
         assertEquals(builder.latestResetTopicsPattern().pattern(), "");
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowOffsetResetSourceWithoutTopics() {
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
+        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
     }
 
     @Test
     public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
+        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
         try {
-            builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
-            fail("Should throw TopologyBuilderException for duplicate source name");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            builder.addSource(Topology.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
+            fail("Should throw TopologyException for duplicate source name");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -127,8 +128,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic-1");
         try {
             builder.addSource(null, "source", null, null, null, "topic-2");
-            fail("Should throw TopologyBuilderException with source name conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with source name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -136,8 +137,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic-1");
         try {
             builder.addSource(null, "source-2", null, null, null, "topic-1");
-            fail("Should throw TopologyBuilderException with topic conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with topic conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -146,16 +147,16 @@ public class InternalTopologyBuilderTest {
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
         try {
             builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-            fail("Should throw TopologyBuilderException with processor name conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with processor name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddProcessorWithWrongParent() {
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddProcessorWithSelfParent() {
         builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
     }
@@ -166,16 +167,16 @@ public class InternalTopologyBuilderTest {
         builder.addSink("sink", "topic-2", null, null, null, "source");
         try {
             builder.addSink("sink", "topic-3", null, null, null, "source");
-            fail("Should throw TopologyBuilderException with sink name conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with sink name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddSinkWithWrongParent() {
         builder.addSink("sink", "topic-2", null, null, null, "source");
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddSinkWithSelfParent() {
         builder.addSink("sink", "topic-2", null, null, null, "sink");
     }
@@ -247,8 +248,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source-1", null, null, null, "foo");
         try {
             builder.addSource(null, "source-2", null, null, null, Pattern.compile("f.*"));
-            fail("Should throw TopologyBuilderException with topic name/pattern conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with topic name/pattern conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -256,11 +257,11 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source-1", null, null, null, Pattern.compile("f.*"));
         try {
             builder.addSource(null, "source-2", null, null, null, "foo");
-            fail("Should throw TopologyBuilderException with topic name/pattern conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with topic name/pattern conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddStateStoreWithNonExistingProcessor() {
         builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
     }
@@ -270,8 +271,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         try {
             builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
-            fail("Should throw TopologyBuilderException with store cannot be added to source");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with store cannot be added to source");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -279,8 +280,8 @@ public class InternalTopologyBuilderTest {
         builder.addSink("sink-1", "topic-1", null, null, null);
         try {
             builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
-            fail("Should throw TopologyBuilderException with store cannot be added to sink");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with store cannot be added to sink");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -288,8 +289,8 @@ public class InternalTopologyBuilderTest {
         builder.addStateStore(new MockStateStoreSupplier("store", false));
         try {
             builder.addStateStore(new MockStateStoreSupplier("store", false));
-            fail("Should throw TopologyBuilderException with store name conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with store name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -326,12 +327,12 @@ public class InternalTopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
-        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
-        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
+        expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -363,13 +364,13 @@ public class InternalTopologyBuilderTest {
         builder.addStateStore(supplier);
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
         final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
         final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
         final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
-        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(
+        expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
             Collections.<String, InternalTopicConfig>emptyMap(),
             Collections.singletonMap(
@@ -378,7 +379,7 @@ public class InternalTopologyBuilderTest {
                     store1,
                     Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                     Collections.<String, String>emptyMap()))));
-        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(
+        expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
             Collections.<String, InternalTopicConfig>emptyMap(),
             Collections.singletonMap(
@@ -387,7 +388,7 @@ public class InternalTopologyBuilderTest {
                     store2,
                     Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                     Collections.<String, String>emptyMap()))));
-        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(
+        expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-5"),
             Collections.<String, InternalTopicConfig>emptyMap(),
             Collections.singletonMap(store3,
@@ -519,8 +520,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
         builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor");
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
-        final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
         final Properties properties = topicConfig.toProperties(0);
         final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(","));
@@ -539,8 +540,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
         builder.addStateStore(new MockStateStoreSupplier("name", true), "processor");
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
-        final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog");
         final Properties properties = topicConfig.toProperties(0);
         assertEquals("appId-name-changelog", topicConfig.name());
@@ -554,7 +555,7 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("appId");
         builder.addInternalTopic("foo");
         builder.addSource(null, "source", null, null, null, "foo");
-        final TopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
+        final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Properties properties = topicConfig.toProperties(0);
         assertEquals("appId-foo", topicConfig.name());
@@ -562,8 +563,9 @@ public class InternalTopologyBuilderTest {
         assertEquals(1, properties.size());
     }
 
-    @Test(expected = TopologyBuilderException.class)
-    public void shouldThroughOnUnassignedStateStoreAccess() {
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -573,24 +575,22 @@ public class InternalTopologyBuilderTest {
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         final StreamsConfig streamsConfig = new StreamsConfig(config);
 
+        builder.addSource(null, sourceNodeName, null, null, null, "topic");
+        builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+        builder.addStateStore(
+            Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+            goodNodeName);
+        builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+
         try {
-            builder.addSource(null, sourceNodeName, null, null, null, "topic");
-            builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-            builder.addStateStore(
-                    Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
-                    goodNodeName);
-            builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
-            final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder);
-            driver.process("topic", null, null);
-        } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. Did catch:", e);
+            new ProcessorTopologyTestDriver(streamsConfig, builder);
+            fail("Should have throw StreamsException");
+        } catch (final StreamsException expected) {
+            final Throwable cause = expected.getCause();
+            if (cause == null
+                || !(cause instanceof TopologyBuilderException)
+                || !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
+                throw new RuntimeException("Did expect different exception. Did catch:", expected);
             }
         }
     }
@@ -639,7 +639,7 @@ public class InternalTopologyBuilderTest {
         builder.updateSubscriptions(subscriptionUpdates, null);
         builder.setApplicationId("test-id");
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
         assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
         assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
@@ -693,7 +693,7 @@ public class InternalTopologyBuilderTest {
         assertFalse(topics.contains("topic-A"));
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         final String sameNameForSourceAndProcessor = "sameName";
         builder.addGlobalStore(

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index f173a65..33e43e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
@@ -526,18 +525,18 @@ public class StreamPartitionAssignorTest {
         assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
-        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
+        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
 
         assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups));
         assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups));
         assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups));
     }
 
-    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, TopologyBuilder.TopicsInfo> topicGroups) {
+    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
         final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
 
         Set<TaskId> ids = new HashSet<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
 
             if (stateChangelogTopics.contains(changelogTopic)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index ce25e67..55f8728 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -178,7 +178,7 @@ public class ProcessorTopologyTestDriver {
         };
 
         // Identify internal topics for forwarding in process ...
-        for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
+        for (final InternalTopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
             internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
         }
 


Mime
View raw message