kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/5] kafka git commit: KAFKA-3856 (KIP-120) step two: extract internal functions from public facing TopologyBuilder class
Date Mon, 24 Jul 2017 18:03:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fc93fb4b6 -> 5d798511b


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index a6d1179..0a08b7c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -68,20 +68,20 @@ import static org.junit.Assert.assertThat;
 
 public class StreamPartitionAssignorTest {
 
-    private TopicPartition t1p0 = new TopicPartition("topic1", 0);
-    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
-    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
-    private TopicPartition t2p0 = new TopicPartition("topic2", 0);
-    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
-    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
-    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
-    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
-    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
-    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
-
-    private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
-
-    private List<PartitionInfo> infos = Arrays.asList(
+    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
+    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
+    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
+    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
+    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
+    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
+    private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
+    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
+    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
+    private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
+
+    private final Set<String> allTopics = Utils.mkSet("topic1", "topic2");
+
+    private final List<PartitionInfo> infos = Arrays.asList(
             new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
@@ -94,8 +94,11 @@ public class StreamPartitionAssignorTest {
             new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(),
-            Collections.<String>emptySet());
+    private final Cluster metadata = new Cluster(
+        "cluster",
+        Collections.singletonList(Node.noNode()),
+        infos, Collections.<String>emptySet(),
+        Collections.<String>emptySet());
 
     private final TaskId task0 = new TaskId(0, 0);
     private final TaskId task1 = new TaskId(0, 1);
@@ -104,7 +107,7 @@ public class StreamPartitionAssignorTest {
     private final String userEndPoint = "localhost:2171";
     private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
-    private final TopologyBuilder builder = new TopologyBuilder();
+    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps());
     private final StreamThread mockStreamThread = new StreamThread(builder, config,
                                                                    mockClientSupplier, "appID",
@@ -133,8 +136,8 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void testSubscription() throws Exception {
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
 
         final Set<TaskId> prevTasks = Utils.mkSet(
@@ -145,8 +148,19 @@ public class StreamPartitionAssignorTest {
 
         String clientId = "client-id";
         UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                               0) {
+        StreamThread thread = new StreamThread(
+            builder,
+            config,
+            new MockClientSupplier(),
+            "test",
+            clientId,
+            processId,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder,
+                StreamsMetadataState.UNKNOWN_HOST),
+            0) {
+
             @Override
             public Set<TaskId> prevActiveTasks() {
                 return prevTasks;
@@ -173,8 +187,8 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void testAssignBasic() throws Exception {
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
         List<String> topics = Utils.mkList("topic1", "topic2");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
@@ -191,8 +205,17 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
 
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -245,10 +268,10 @@ public class StreamPartitionAssignorTest {
         props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class);
         StreamsConfig config = new StreamsConfig(props);
 
-        builder.addSource("source1", "topic1");
+        builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
         builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1");
-        builder.addSource("source2", "topic2");
+        builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
         builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor2");
         List<String> topics = Utils.mkList("topic1", "topic2");
@@ -257,7 +280,17 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -281,8 +314,8 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void testAssignEmptyMetadata() throws Exception {
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
         List<String> topics = Utils.mkList("topic1", "topic2");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
@@ -296,7 +329,17 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            new MockClientSupplier(),
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
@@ -338,9 +381,9 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void testAssignWithNewTasks() throws Exception {
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addSource("source3", "topic3");
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
+        builder.addSource(null, "source3", null, null, null, "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
         List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
@@ -354,8 +397,17 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -397,8 +449,8 @@ public class StreamPartitionAssignorTest {
     public void testAssignWithStates() throws Exception {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
         builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
@@ -422,8 +474,17 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
 
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -488,8 +549,8 @@ public class StreamPartitionAssignorTest {
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         StreamsConfig config = new StreamsConfig(props);
 
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
         List<String> topics = Utils.mkList("topic1", "topic2");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
@@ -506,8 +567,17 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -559,16 +629,24 @@ public class StreamPartitionAssignorTest {
     public void testOnAssignment() throws Exception {
         TopicPartition t2p3 = new TopicPartition("topic2", 3);
 
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
 
         UUID uuid = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                               0);
+        StreamThread thread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
 
@@ -593,10 +671,10 @@ public class StreamPartitionAssignorTest {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
-        builder.addSource("source1", "topic1");
+        builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
-        builder.addSink("sink1", "topicX", "processor1");
-        builder.addSource("source2", "topicX");
+        builder.addSink("sink1", "topicX", null, null, null, "processor1");
+        builder.addSource(null, "source2", null, null, null, "topicX");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
         List<String> topics = Utils.mkList("topic1", "test-topicX");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
@@ -605,8 +683,17 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
 
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
         MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
@@ -629,14 +716,14 @@ public class StreamPartitionAssignorTest {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
-        builder.addSource("source1", "topic1");
+        builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
-        builder.addSink("sink1", "topicX", "processor1");
-        builder.addSource("source2", "topicX");
+        builder.addSink("sink1", "topicX", null, null, null, "processor1");
+        builder.addSource(null, "source2", null, null, null, "topicX");
         builder.addInternalTopic("topicZ");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
-        builder.addSink("sink2", "topicZ", "processor2");
-        builder.addSource("source3", "topicZ");
+        builder.addSink("sink2", "topicZ", null, null, null, "processor2");
+        builder.addSource(null, "source3", null, null, null, "topicZ");
         List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
@@ -669,9 +756,9 @@ public class StreamPartitionAssignorTest {
         final StreamsConfig config = new StreamsConfig(properties);
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
-        builder.addSource("source", "input");
+        builder.addSource(null, "source", null, null, null, "input");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addSink("sink", "output", "processor");
+        builder.addSink("sink", "output", null, null, null, "processor");
 
         final UUID uuid1 = UUID.randomUUID();
         final String client1 = "client1";
@@ -693,17 +780,26 @@ public class StreamPartitionAssignorTest {
         final StreamsConfig config = new StreamsConfig(properties);
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
-        builder.addSource("source", "topic1");
+        builder.addSource(null, "source", null, null, null, "topic1");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addSink("sink", "output", "processor");
+        builder.addSink("sink", "output", null, null, null, "processor");
 
         final List<String> topics = Utils.mkList("topic1");
 
         final UUID uuid1 = UUID.randomUUID();
         final String client1 = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
@@ -734,9 +830,17 @@ public class StreamPartitionAssignorTest {
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
 
-        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1,
-                                                           new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer));
 
@@ -760,9 +864,17 @@ public class StreamPartitionAssignorTest {
         builder.setApplicationId(applicationId);
 
 
-        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1,
-                                                           new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         try {
             partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
@@ -874,10 +986,22 @@ public class StreamPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread streamThread = new StreamThread(
+            builder.internalTopologyBuilder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client,
+            uuid,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
-        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer);
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            streamThread.config,
+            mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -953,10 +1077,22 @@ public class StreamPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread streamThread = new StreamThread(
+            builder.internalTopologyBuilder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client,
+            uuid,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(
+            streamThread.config,
+            mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         final Set<TaskId> emptyTasks = Collections.emptySet();
@@ -1046,9 +1182,10 @@ public class StreamPartitionAssignorTest {
             }
         }
 
-        if (info.standbyTasks.size() > 0)
+        if (info.standbyTasks.size() > 0) {
             // check if standby partitions cover all topics
             assertEquals(expectedTopics, standbyTopics);
+        }
 
         return info;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a7f1db1..9b9d6cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.Stores;
@@ -98,6 +97,7 @@ public class StreamThreadTest {
     @Before
     public void setUp() throws Exception {
         processId = UUID.randomUUID();
+        builder.setApplicationId(applicationId);
     }
 
     private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -442,7 +442,7 @@ public class StreamThreadTest {
     public void testStateChangeStartClose() throws InterruptedException {
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -450,7 +450,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final StateListenerStub stateListener = new StateListenerStub();
@@ -494,7 +494,7 @@ public class StreamThreadTest {
         //clientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
 
         final StreamThread thread1 = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -502,10 +502,10 @@ public class StreamThreadTest {
             processId,
             metrics,
             Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
         final StreamThread thread2 = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -513,7 +513,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
@@ -617,7 +617,7 @@ public class StreamThreadTest {
     @Test
     public void testMetrics() {
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -625,7 +625,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
         final String defaultGroupName = "stream-metrics";
         final String defaultPrefix = "thread." + thread.threadClientId();
@@ -678,7 +678,7 @@ public class StreamThreadTest {
             extraDir.mkdir();
             builder.addSource("source1", "topic1");
             final StreamThread thread = new StreamThread(
-                builder,
+                builder.internalTopologyBuilder,
                 config,
                 clientSupplier,
                 applicationId,
@@ -686,7 +686,7 @@ public class StreamThreadTest {
                 processId,
                 metrics,
                 mockTime,
-                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                 0) {
                 @Override
                 public void maybeClean(final long now) {
@@ -815,7 +815,7 @@ public class StreamThreadTest {
             builder.addSource("source1", "topic1");
 
             final StreamThread thread = new StreamThread(
-                builder,
+                builder.internalTopologyBuilder,
                 config,
                 clientSupplier,
                 applicationId,
@@ -823,7 +823,7 @@ public class StreamThreadTest {
                 processId,
                 metrics,
                 mockTime,
-                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                 0) {
 
                 @Override
@@ -907,7 +907,7 @@ public class StreamThreadTest {
         builder.addSource("source1", "someTopic");
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -915,7 +915,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
@@ -943,7 +943,7 @@ public class StreamThreadTest {
 
         final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
@@ -951,7 +951,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
@@ -982,7 +982,7 @@ public class StreamThreadTest {
 
         final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
@@ -990,7 +990,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
@@ -1015,7 +1015,7 @@ public class StreamThreadTest {
         builder.addSource("source1", "someTopic");
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1023,7 +1023,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
@@ -1046,7 +1046,7 @@ public class StreamThreadTest {
         builder.addSource("name", "topic").addSink("out", "output");
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1054,7 +1054,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         thread.setPartitionAssignor(new StreamPartitionAssignor() {
@@ -1086,7 +1086,7 @@ public class StreamThreadTest {
                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime));
 
         final StreamThread thread = new StreamThread(
-                builder,
+                builder.internalTopologyBuilder,
                 config,
                 clientSupplier,
                 applicationId,
@@ -1094,7 +1094,7 @@ public class StreamThreadTest {
                 processId,
                 metrics,
                 mockTime,
-                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                 0) {
 
             @Override
@@ -1133,13 +1133,11 @@ public class StreamThreadTest {
 
     @Test
     public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks()  {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey().count("count-one");
         builder.stream("t2").groupByKey().count("count-two");
 
         final StreamThread thread = new StreamThread(
-                builder,
+                builder.internalTopologyBuilder,
                 config,
                 clientSupplier,
                 applicationId,
@@ -1147,7 +1145,7 @@ public class StreamThreadTest {
                 processId,
                 metrics,
                 mockTime,
-                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                 0);
 
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@@ -1190,18 +1188,13 @@ public class StreamThreadTest {
                 new TopicPartition("stream-thread-test-count-two-changelog", 0))));
     }
 
-
-
-
     @Test
     public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey().count("count-one");
         builder.stream("t2").groupByKey().count("count-two");
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1209,7 +1202,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
@@ -1268,14 +1261,12 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
         builder.stream(Pattern.compile("t.*")).to("out");
 
         final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>();
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1283,7 +1274,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0) {
 
             @Override
@@ -1355,7 +1346,7 @@ public class StreamThreadTest {
 
         final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
@@ -1363,7 +1354,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final MockConsumer consumer = clientSupplier.consumer;
@@ -1451,7 +1442,7 @@ public class StreamThreadTest {
 
         final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
@@ -1459,7 +1450,7 @@ public class StreamThreadTest {
             processId,
             new Metrics(),
             new MockTime(),
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1484,8 +1475,6 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
@@ -1507,7 +1496,7 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1515,7 +1504,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0) {
 
             @Override
@@ -1561,7 +1550,7 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1569,7 +1558,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0) {
 
             @Override
@@ -1606,8 +1595,6 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
@@ -1629,7 +1616,7 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1637,7 +1624,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0) {
 
             @Override
@@ -1667,8 +1654,6 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
@@ -1690,7 +1675,7 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1698,7 +1683,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1729,12 +1714,11 @@ public class StreamThreadTest {
     @Test
     @SuppressWarnings("unchecked")
     public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception {
-        final TopologyBuilder topologyBuilder = new TopologyBuilder();
-        topologyBuilder.addSource("source", Pattern.compile("t.*"));
-        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addSource("source", Pattern.compile("t.*"));
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
 
         final StreamThread thread = new StreamThread(
-            topologyBuilder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1742,7 +1726,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
@@ -1756,11 +1740,11 @@ public class StreamThreadTest {
 
         final Field
             nodeToSourceTopicsField =
-            topologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
+            builder.internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
         nodeToSourceTopicsField.setAccessible(true);
         final Map<String, List<String>>
             nodeToSourceTopics =
-            (Map<String, List<String>>) nodeToSourceTopicsField.get(topologyBuilder);
+            (Map<String, List<String>>) nodeToSourceTopicsField.get(builder.internalTopologyBuilder);
         final List<TopicPartition> topicPartitions = new ArrayList<>();
 
         final TopicPartition topicPartition1 = new TopicPartition("topic-1", 0);
@@ -1856,8 +1840,6 @@ public class StreamThreadTest {
     }
 
     private StreamThread setupTest(final TaskId taskId) throws InterruptedException {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId(applicationId);
         builder.addSource("source", "topic");
 
         final MockClientSupplier clientSupplier = new MockClientSupplier();
@@ -1883,9 +1865,18 @@ public class StreamThreadTest {
             }
         };
 
-        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-            clientId, processId, new Metrics(), new MockTime(),
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder.internalTopologyBuilder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
+
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;
@@ -1968,8 +1959,6 @@ public class StreamThreadTest {
         final String storeName = "store";
         final String changelogTopic = applicationId + "-" + storeName + "-changelog";
 
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
         builder.stream("topic1").groupByKey().count(storeName);
 
         final MockClientSupplier clientSupplier = new MockClientSupplier();
@@ -1986,9 +1975,17 @@ public class StreamThreadTest {
             }
         });
 
-        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-            clientId, processId, new Metrics(), new MockTime(),
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder.internalTopologyBuilder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
 
             @Override
             protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -2060,7 +2057,7 @@ public class StreamThreadTest {
 
     private StreamThread getStreamThread() {
         return new StreamThread(
-            builder,
+            builder.internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -2068,7 +2065,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0) {
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index c8ab6f1..8ee1d6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -115,7 +115,7 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new StreamsMetadataState(builder, hostOne);
+        discovery = new StreamsMetadataState(builder.internalTopologyBuilder, hostOne);
         discovery.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
@@ -127,7 +127,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception {
-        new StreamsMetadataState(builder, hostOne).getAllMetadataForStore("store");
+        new StreamsMetadataState(builder.internalTopologyBuilder, hostOne).getAllMetadataForStore("store");
     }
 
     @Test
@@ -294,7 +294,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() throws Exception {
-        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()));
     }
@@ -307,7 +307,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() throws Exception {
-        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index bf55b47..d2f236f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -113,7 +113,7 @@ public class StreamThreadStateStoreProviderTest {
         storesAvailable = true;
         provider = new StreamThreadStateStoreProvider(
             new StreamThread(
-                builder,
+                builder.internalTopologyBuilder,
                 streamsConfig,
                 clientSupplier,
                 applicationId,
@@ -121,7 +121,7 @@ public class StreamThreadStateStoreProviderTest {
                 UUID.randomUUID(),
                 new Metrics(),
                 Time.SYSTEM,
-                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                 0) {
 
                 @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index c59113e..d026c60 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -160,7 +161,7 @@ public class ProcessorTopologyTestDriver {
      * @param builder the topology builder that will be used to create the topology instance
      */
     public ProcessorTopologyTestDriver(final StreamsConfig config,
-                                       final TopologyBuilder builder) {
+                                       final InternalTopologyBuilder builder) {
         topology = builder.setApplicationId(APPLICATION_ID).build(null);
         final ProcessorTopology globalTopology  = builder.buildGlobalStateTopology();
 
@@ -351,7 +352,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Get the {@link StateStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is
+     * {@link #ProcessorTopologyTestDriver(StreamsConfig, InternalTopologyBuilder) this object's constructor}, and is
      * presumed to be used by a Processor within the topology.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
@@ -367,7 +368,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Get the {@link KeyValueStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is
+     * {@link #ProcessorTopologyTestDriver(StreamsConfig, InternalTopologyBuilder) this object's constructor}, and is
      * presumed to be used by a Processor within the topology.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to


Mime
View raw message