kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-4117: Stream partitionassignro cleanup
Date Sat, 29 Oct 2016 17:48:33 GMT
KAFKA-4117: Stream partitionassignro cleanup

1. Create a new `ClientMetadata` to collapse `Set<String> consumerMemberIds`, `ClientState<TaskId> state`, and `HostInfo hostInfo`.

2. Stop reusing `stateChangelogTopicToTaskIds` and `internalSourceTopicToTaskIds` to access the (sub-)topology's internal repartition and changelog topics for clarity; also use the source topics num.partitions to set the num.partitions for repartition topics, and clarify to NOT have cycles since otherwise the while loop will fail.

3. `ensure-copartition` at the end to modify the number of partitions for repartition topics if necessary to be equal to other co-partition topics.

4. Refactor `ClientState` as well and update the logic of `TaskAssignor` for clarity as well.

5. Change default `clientId` from `applicationId-suffix` to `applicationId-processId` where `processId` is an UUID to avoid conflicts of clientIds that are from different JVMs, and hence conflicts in metrics.

6. Enforce `assignment` partitions to have the same size, and hence 1-1 mapping to `activeTask` taskIds.

7. Remove the `AssignmentSupplier` class by always construct the `partitionsByHostState` before assigning tasks to consumers within a client.

8. Remove all unnecessary member variables in `StreamPartitionAssignor`.

9. Some other minor fixes on unit tests, e.g. remove `test only` functions with java field reflection.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Xavier Léauté, Matthias J. Sax, Eno Thereska, Jason Gustafson

Closes #2012 from guozhangwang/K4117-stream-partitionassignro-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a4ab9d02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a4ab9d02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a4ab9d02

Branch: refs/heads/trunk
Commit: a4ab9d02a22e77f0ebca0450e608898d83d4fe18
Parents: 0dd9607
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Sat Oct 29 10:48:17 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Oct 29 10:48:17 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/common/Cluster.java   |  20 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   4 +-
 .../streams/processor/TopologyBuilder.java      |   8 +-
 .../internals/InternalTopicConfig.java          |   3 +-
 .../streams/processor/internals/SinkNode.java   |   5 -
 .../streams/processor/internals/SourceNode.java |   5 -
 .../internals/StreamPartitionAssignor.java      | 717 ++++++++++---------
 .../processor/internals/StreamThread.java       |  39 +-
 .../internals/assignment/AssignmentInfo.java    |  14 +-
 .../internals/assignment/ClientState.java       |  16 +-
 .../internals/assignment/TaskAssignor.java      |  37 +-
 .../apache/kafka/streams/state/HostInfo.java    |   2 +-
 .../state/internals/StoreChangeLogger.java      |  10 -
 .../integration/FanoutIntegrationTest.java      |   1 -
 .../kstream/internals/KStreamImplTest.java      |   2 +-
 .../kstream/internals/KTableImplTest.java       |  16 +-
 .../streams/processor/TopologyBuilderTest.java  |   4 +-
 .../internals/StreamPartitionAssignorTest.java  | 148 ++--
 .../internals/StreamsMetadataStateTest.java     |   2 +-
 .../assignment/AssignmentInfoTest.java          |   2 +-
 .../internals/assignment/TaskAssignorTest.java  | 115 +--
 .../state/internals/StoreChangeLoggerTest.java  |  21 +-
 22 files changed, 642 insertions(+), 549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 471ae26..3c6475d 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -207,6 +207,16 @@ public final class Cluster {
     }
 
     /**
+     * Get the number of partitions for the given topic
+     * @param topic The topic to get the number of partitions for
+     * @return The number of partitions or null if there is no corresponding metadata
+     */
+    public Integer partitionCountForTopic(String topic) {
+        List<PartitionInfo> partitions = this.partitionsByTopic.get(topic);
+        return partitions == null ? null : partitions.size();
+    }
+
+    /**
      * Get the list of available partitions for this topic
      * @param topic The topic name
      * @return A list of partitions
@@ -225,16 +235,6 @@ public final class Cluster {
     }
 
     /**
-     * Get the number of partitions for the given topic
-     * @param topic The topic to get the number of partitions for
-     * @return The number of partitions or null if there is no corresponding metadata
-     */
-    public Integer partitionCountForTopic(String topic) {
-        List<PartitionInfo> partitionInfos = this.partitionsByTopic.get(topic);
-        return partitionInfos == null ? null : partitionInfos.size();
-    }
-
-    /**
      * Get all topics.
      * @return a set of all topics
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2333db7..e120b31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -47,7 +47,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
@@ -91,7 +90,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class KafkaStreams {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
-    private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.streams";
 
     // container states
@@ -156,7 +154,7 @@ public class KafkaStreams {
 
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
-            clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+            clientId = applicationId + "-" + processId;
 
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
             MetricsReporter.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f1f63..ecac8c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -227,14 +227,14 @@ public class TopologyBuilder {
     public static class TopicsInfo {
         public Set<String> sinkTopics;
         public Set<String> sourceTopics;
-        public Map<String, InternalTopicConfig> interSourceTopics;
         public Map<String, InternalTopicConfig> stateChangelogTopics;
+        public Map<String, InternalTopicConfig> repartitionSourceTopics;
 
-        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> interSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
+        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
             this.sinkTopics = sinkTopics;
             this.sourceTopics = sourceTopics;
-            this.interSourceTopics = interSourceTopics;
             this.stateChangelogTopics = stateChangelogTopics;
+            this.repartitionSourceTopics = repartitionSourceTopics;
         }
 
         @Override
@@ -258,7 +258,7 @@ public class TopologyBuilder {
             return "TopicsInfo{" +
                     "sinkTopics=" + sinkTopics +
                     ", sourceTopics=" + sourceTopics +
-                    ", interSourceTopics=" + interSourceTopics +
+                    ", repartitionSourceTopics=" + repartitionSourceTopics +
                     ", stateChangelogTopics=" + stateChangelogTopics +
                     '}';
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index 45016c8..c23f134 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -30,9 +30,10 @@ public class InternalTopicConfig {
 
     private final String name;
     private final Map<String, String> logConfig;
-    private Long retentionMs;
     private final Set<CleanupPolicy> cleanupPolicies;
 
+    private Long retentionMs;
+
     public InternalTopicConfig(final String name, final Set<CleanupPolicy> defaultCleanupPolicies, final Map<String, String> logConfig) {
         Objects.requireNonNull(name, "name can't be null");
         if (defaultCleanupPolicies.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2b5692d..4e56f61 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -77,11 +77,6 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         // do nothing
     }
 
-    // for test only
-    public Serializer<V> valueSerializer() {
-        return valSerializer;
-    }
-
     /**
      * @return a string representation of this node, useful for debugging.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 4bc3a53..e17509b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -71,11 +71,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         // do nothing
     }
 
-    // for test only
-    public Deserializer<V> valueDeserializer() {
-        return valDeserializer;
-    }
-
     /**
      * @return a string representation of this node, useful for debugging.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index dcba543..009ba1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -46,37 +46,92 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
 import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
 
 public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
-    private String userEndPointConfig;
-    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
-    private Cluster metadataWithInternalTopics;
-
 
     private static class AssignedPartition implements Comparable<AssignedPartition> {
         public final TaskId taskId;
         public final TopicPartition partition;
 
-        public AssignedPartition(TaskId taskId, TopicPartition partition) {
+        AssignedPartition(final TaskId taskId, final TopicPartition partition) {
             this.taskId = taskId;
             this.partition = partition;
         }
 
         @Override
-        public int compareTo(AssignedPartition that) {
+        public int compareTo(final AssignedPartition that) {
             return PARTITION_COMPARATOR.compare(this.partition, that.partition);
         }
     }
 
+    private static class ClientMetadata {
+        final HostInfo hostInfo;
+        final Set<String> consumers;
+        final ClientState<TaskId> state;
+
+        ClientMetadata(final String endPoint) {
+
+            // get the host info if possible
+            if (endPoint != null) {
+                final String host = getHost(endPoint);
+                final Integer port = getPort(endPoint);
+
+                if (host == null || port == null)
+                    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
+
+                hostInfo = new HostInfo(host, port);
+            } else {
+                hostInfo = null;
+            }
+
+            // initialize the consumer memberIds
+            consumers = new HashSet<>();
+
+            // initialize the client state
+            state = new ClientState<>();
+        }
+
+        void addConsumer(final String consumerMemberId, final SubscriptionInfo info) {
+
+            consumers.add(consumerMemberId);
+
+            state.prevActiveTasks.addAll(info.prevTasks);
+            state.prevAssignedTasks.addAll(info.prevTasks);
+            state.prevAssignedTasks.addAll(info.standbyTasks);
+            state.capacity = state.capacity + 1d;
+        }
+
+        @Override
+        public String toString() {
+            return "ClientMetadata{" +
+                    "hostInfo=" + hostInfo +
+                    ", consumers=" + consumers +
+                    ", state=" + state +
+                    '}';
+        }
+    }
+
+    private static class InternalTopicMetadata {
+        public final InternalTopicConfig config;
+
+        public int numPartitions;
+
+        InternalTopicMetadata(final InternalTopicConfig config) {
+            this.config = config;
+            this.numPartitions = -1;
+        }
+    }
+
     private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
         @Override
         public int compare(TopicPartition p1, TopicPartition p2) {
@@ -92,12 +147,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     private StreamThread streamThread;
 
+    private String userEndPoint;
     private int numStandbyReplicas;
-    private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
-    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-    private Map<InternalTopicConfig, Set<TaskId>> stateChangelogTopicToTaskIds;
-    private Map<InternalTopicConfig, Set<TaskId>> internalSourceTopicToTaskIds;
+
+    private Cluster metadataWithInternalTopics;
+    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
+    private Map<TaskId, Set<TopicPartition>> activeTasks;
 
     private InternalTopicManager internalTopicManager;
 
@@ -111,7 +168,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
-
         Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
             KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -130,21 +186,20 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
-            final String[] hostPort = userEndPoint.split(":");
-            if (hostPort.length != 2) {
-                throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
-                                                       " but received %s",
-                        streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
-            } else {
-                try {
-                    Integer.valueOf(hostPort[1]);
-                    this.userEndPointConfig = userEndPoint;
-                } catch (NumberFormatException nfe) {
-                    throw new ConfigException(String.format("stream-thread [%s] Invalid port %s supplied in %s for config %s",
-                            streamThread.getName(), hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
-                }
+            try {
+                String host = getHost(userEndPoint);
+                Integer port = getPort(userEndPoint);
+
+                if (host == null || port == null)
+                    throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
+                                    " but received %s",
+                            streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+            } catch (NumberFormatException nfe) {
+                throw new ConfigException(String.format("stream-thread [%s] Invalid port supplied in %s for config %s",
+                        streamThread.getName(), userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
             }
 
+            this.userEndPoint = userEndPoint;
         }
 
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
@@ -174,7 +229,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Set<TaskId> prevTasks = streamThread.prevTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
+        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPoint);
 
         if (streamThread.builder.sourceTopicPattern() != null) {
             SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
@@ -187,238 +242,252 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
 
-    /**
-     * Internal helper function that creates a Kafka topic
-     * @param topicToTaskIds Map that contains the topic names to be created
-     * @param postPartitionPhase If true, the computation for calculating the number of partitions
-     *                           is slightly different. Set to true after the initial topic-to-partition
-     *                           assignment.
-     * @return
+    /*
+     * This assigns tasks to consumer clients in the following steps.
+     *
+     * 0. check all repartition source topics and use internal topic manager to make sure
+     *    they have been created with the right number of partitions.
+     *
+     * 1. using user customized partition grouper to generate tasks along with their
+     *    assigned partitions; also make sure that the task's corresponding changelog topics
+     *    have been created with the right number of partitions.
+     *
+     * 2. using TaskAssignor to assign tasks to consumer clients.
+     *    - Assign a task to a client which was running it previously.
+     *      If there is no such client, assign a task to a client which has its valid local state.
+     *    - A client may have more than one stream threads.
+     *      The assignor tries to assign tasks to a client proportionally to the number of threads.
+     *    - We try not to assign the same set of tasks to two different clients
+     *    We do the assignment in one-pass. The result may not satisfy above all.
+     *
+     * 3. within each client, tasks are assigned to consumer clients in round-robin manner.
      */
-    private Map<TopicPartition, PartitionInfo> prepareTopic(Map<InternalTopicConfig, Set<TaskId>> topicToTaskIds,
-                                                            boolean postPartitionPhase) {
-        Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>();
-        // if ZK is specified, prepare the internal source topic before calling partition grouper
-        if (internalTopicManager != null) {
-            log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
-
-            for (Map.Entry<InternalTopicConfig, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
-                InternalTopicConfig topic = entry.getKey();
-                int numPartitions = 0;
-                if (postPartitionPhase) {
-                    // the expected number of partitions is the max value of TaskId.partition + 1
-                    for (TaskId task : entry.getValue()) {
-                        if (numPartitions < task.partition + 1)
-                            numPartitions = task.partition + 1;
-                    }
-                } else {
-                    // should have size 1 only
-                    numPartitions = -1;
-                    for (TaskId task : entry.getValue()) {
-                        numPartitions = task.partition;
-                    }
-                }
-
-                internalTopicManager.makeReady(topic, numPartitions);
-
-                // wait until the topic metadata has been propagated to all brokers
-                List<PartitionInfo> partitions;
-                do {
-                    partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
-                } while (partitions == null || partitions.size() != numPartitions);
-
-                for (PartitionInfo partition : partitions)
-                    partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition);
-            }
-
-            log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
-        } else {
-            List<String> missingTopics = new ArrayList<>();
-            for (InternalTopicConfig topic : topicToTaskIds.keySet()) {
-                List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
-                if (partitions == null) {
-                    missingTopics.add(topic.name());
-                }
-            }
-            if (!missingTopics.isEmpty()) {
-                log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
-                        streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
-
-            }
-        }
-
-        return partitionInfos;
-    }
-
     @Override
     public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
-        // This assigns tasks to consumer clients in two steps.
-        // 1. using TaskAssignor to assign tasks to consumer clients.
-        //    - Assign a task to a client which was running it previously.
-        //      If there is no such client, assign a task to a client which has its valid local state.
-        //    - A client may have more than one stream threads.
-        //      The assignor tries to assign tasks to a client proportionally to the number of threads.
-        //    - We try not to assign the same set of tasks to two different clients
-        //    We do the assignment in one-pass. The result may not satisfy above all.
-        // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
-        Map<UUID, Set<String>> consumersByClient = new HashMap<>();
-        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-        Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
-        // decode subscription info
+
+        // construct the client metadata from the decoded subscription info
+        Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
 
-
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-            if (info.userEndPoint != null) {
-                final String[] hostPort = info.userEndPoint.split(":");
-                consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
-            }
-            Set<String> consumers = consumersByClient.get(info.processId);
-            if (consumers == null) {
-                consumers = new HashSet<>();
-                consumersByClient.put(info.processId, consumers);
-            }
-            consumers.add(consumerId);
 
-            ClientState<TaskId> state = states.get(info.processId);
-            if (state == null) {
-                state = new ClientState<>();
-                states.put(info.processId, state);
+            // create the new client metadata if necessary
+            ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
+
+            if (clientMetadata == null) {
+                clientMetadata = new ClientMetadata(info.userEndPoint);
+                clientsMetadata.put(info.processId, clientMetadata);
             }
 
-            state.prevActiveTasks.addAll(info.prevTasks);
-            state.prevAssignedTasks.addAll(info.prevTasks);
-            state.prevAssignedTasks.addAll(info.standbyTasks);
-            state.capacity = state.capacity + 1d;
+            // add the consumer to the client
+            clientMetadata.addConsumer(consumerId, info);
         }
 
+        log.info("stream-thread [{}] Constructed client metadata {} from the member subscriptions.", streamThread.getName(), clientsMetadata);
 
-        this.topicGroups = streamThread.builder.topicGroups();
+        // ---------------- Step Zero ---------------- //
 
-        // ensure the co-partitioning topics within the group have the same number of partitions,
-        // and enforce the number of partitions for those internal topics.
-        internalSourceTopicToTaskIds = new HashMap<>();
-        Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
-        Map<Integer, Collection<InternalTopicConfig>> internalSourceTopicGroups = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
-            sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
-            internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics.values());
+        // parse the topology to determine the repartition source topics,
+        // making sure they are created with the number of partitions as
+        // the maximum of the depending sub-topologies source topics' number of partitions
+        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
+
+        Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
+        for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+            for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
+                repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
+            }
         }
 
+        boolean numPartitionsNeeded;
+        do {
+            numPartitionsNeeded = false;
+
+            for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+                for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                    int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
+
+                    // try set the number of partitions for this repartition topic if it is not set yet
+                    if (numPartitions == -1) {
+                        for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+                            Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
+
+                            if (otherSinkTopics.contains(topicName)) {
+                                // if this topic is one of the sink topics of this topology,
+                                // use the maximum of all its source topic partitions as the number of partitions
+                                for (String sourceTopicName : otherTopicsInfo.sourceTopics) {
+                                    Integer numPartitionsCandidate;
+                                    // It is possible the sourceTopic is another internal topic, i.e,
+                                    // map().join().join(map())
+                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
+                                        numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions;
+                                    } else {
+                                        numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
+                                    }
 
-        // for all internal source topics
-        // set the number of partitions to the maximum of the depending sub-topologies source topics
-        Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>();
-        Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
-            Map<String, InternalTopicConfig> internalTopics = entry.getValue().interSourceTopics;
-            allInternalTopics.putAll(internalTopics);
-            for (InternalTopicConfig internalTopic : internalTopics.values()) {
-                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic);
-
-                if (tasks == null) {
-                    int numPartitions = -1;
-                    for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : topicGroups.entrySet()) {
-                        Set<String> otherSinkTopics = other.getValue().sinkTopics;
-
-                        if (otherSinkTopics.contains(internalTopic.name())) {
-                            for (String topic : other.getValue().sourceTopics) {
-                                Integer partitions = null;
-                                // It is possible the sourceTopic is another internal topic, i.e,
-                                // map().join().join(map())
-                                if (allInternalTopics.containsKey(topic)) {
-                                    Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(allInternalTopics.get(topic));
-                                    if (taskIds != null) {
-                                        for (TaskId taskId : taskIds) {
-                                            partitions = taskId.partition;
-                                        }
+                                    if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) {
+                                        numPartitions = numPartitionsCandidate;
                                     }
-                                } else {
-                                    partitions = metadata.partitionCountForTopic(topic);
-                                }
-                                if (partitions != null && partitions > numPartitions) {
-                                    numPartitions = partitions;
                                 }
                             }
                         }
-                    }
-                    internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
-                    for (int partition = 0; partition < numPartitions; partition++) {
-                        internalPartitionInfos.put(new TopicPartition(internalTopic.name(), partition),
-                                                   new PartitionInfo(internalTopic.name(), partition, null, new Node[0], new Node[0]));
+
+                        // if we still have not find the right number of partitions,
+                        // another iteration is needed
+                        if (numPartitions == -1)
+                            numPartitionsNeeded = true;
+                        else
+                            repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
                     }
                 }
             }
+        } while (numPartitionsNeeded);
+
+        // augment the metadata with the newly computed number of partitions for all the
+        // repartition source topics
+        Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
+        for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
+            String topic = entry.getKey();
+            Integer numPartitions = entry.getValue().numPartitions;
+
+            for (int partition = 0; partition < numPartitions; partition++) {
+                allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
+                        new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
+            }
         }
 
+        // ensure the co-partitioning topics within the group have the same number of partitions,
+        // and enforce the number of partitions for those repartition topics to be the same if they
+        // are co-partitioned as well.
+        ensureCopartitioning(streamThread.builder.copartitionGroups(), repartitionTopicMetadata, metadata);
 
-        Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
-        ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups,
-                             metadata.withPartitions(internalPartitionInfos));
-
-
-        internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false);
-        internalSourceTopicToTaskIds.clear();
+        // make sure the repartition source topics exist with the right number of partitions,
+        // create these topics if necessary
+        prepareTopic(repartitionTopicMetadata);
 
         metadataWithInternalTopics = metadata;
         if (internalTopicManager != null)
-            metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
+            metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
+
+        log.debug("stream-thread [{}] Created repartition topics {} from the parsed topology.", streamThread.getName(), allRepartitionTopicPartitions.values());
+
+        // ---------------- Step One ---------------- //
 
         // get the tasks as partition groups from the partition grouper
-        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
-                sourceTopicGroups, metadataWithInternalTopics);
+        Set<String> allSourceTopics = new HashSet<>();
+        Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            allSourceTopics.addAll(entry.getValue().sourceTopics);
+            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
+        }
 
-        // add tasks to state change log topic subscribers
-        stateChangelogTopicToTaskIds = new HashMap<>();
-        for (TaskId task : partitionsForTask.keySet()) {
-            final Map<String, InternalTopicConfig> stateChangelogTopics = topicGroups.get(task.topicGroupId).stateChangelogTopics;
-            for (InternalTopicConfig topic : stateChangelogTopics.values()) {
-                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topic);
-                if (tasks == null) {
-                    tasks = new HashSet<>();
-                    stateChangelogTopicToTaskIds.put(topic, tasks);
+        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
+                sourceTopicsByGroup, metadataWithInternalTopics);
+
+        // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
+        Set<TopicPartition> allAssignedPartitions = new HashSet<>();
+        Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+            Set<TopicPartition> partitions = entry.getValue();
+            for (TopicPartition partition : partitions) {
+                if (allAssignedPartitions.contains(partition)) {
+                    log.warn("stream-thread [{}] Partition {} is assigned to more than one tasks: {}", streamThread.getName(), partition, partitionsForTask);
                 }
+            }
+            allAssignedPartitions.addAll(partitions);
 
-                tasks.add(task);
+            TaskId id = entry.getKey();
+            Set<TaskId> ids = tasksByTopicGroup.get(id.topicGroupId);
+            if (ids == null) {
+                ids = new HashSet<>();
+                tasksByTopicGroup.put(id.topicGroupId, ids);
+            }
+            ids.add(id);
+        }
+        for (String topic : allSourceTopics) {
+            for (PartitionInfo partitionInfo : metadataWithInternalTopics.partitionsForTopic(topic)) {
+                TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+                if (!allAssignedPartitions.contains(partition)) {
+                    log.warn("stream-thread [{}] Partition {} is not assigned to any tasks: {}", streamThread.getName(), partition, partitionsForTask);
+                }
             }
+        }
 
-            final Map<String, InternalTopicConfig> interSourceTopics = topicGroups.get(task.topicGroupId).interSourceTopics;
-            for (InternalTopicConfig topic : interSourceTopics.values()) {
-                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topic);
-                if (tasks == null) {
-                    tasks = new HashSet<>();
-                    internalSourceTopicToTaskIds.put(topic, tasks);
+        // add tasks to state change log topic subscribers
+        Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            final int topicGroupId = entry.getKey();
+            final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
+
+            for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
+                // the expected number of partitions is the max value of TaskId.partition + 1
+                int numPartitions = -1;
+                for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
+                    if (numPartitions < task.partition + 1)
+                        numPartitions = task.partition + 1;
                 }
 
-                tasks.add(task);
+                InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
+                topicMetadata.numPartitions = numPartitions;
+
+                changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
             }
         }
 
+        prepareTopic(changelogTopicMetadata);
+
+        log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", streamThread.getName(), changelogTopicMetadata);
+
+        // ---------------- Step Two ---------------- //
+
         // assign tasks to clients
-        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas, streamThread.getName());
+        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+            states.put(entry.getKey(), entry.getValue().state);
+        }
+
+        log.debug("stream-thread [{}] Assigning tasks {} to clients {} with number of replicas {}",
+                streamThread.getName(), partitionsForTask.keySet(), states, numStandbyReplicas);
+
+        TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+
+        log.info("stream-thread [{}] Assigned tasks to clients as {}.", streamThread.getName(), states);
 
-        final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>();
+        // ---------------- Step Three ---------------- //
+
+        // construct the global partition assignment per host map
+        partitionsByHostState = new HashMap<>();
+        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+            HostInfo hostInfo = entry.getValue().hostInfo;
+
+            if (hostInfo != null) {
+                final Set<TopicPartition> topicPartitions = new HashSet<>();
+                final ClientState<TaskId> state = entry.getValue().state;
+
+                for (TaskId id : state.assignedTasks) {
+                    topicPartitions.addAll(partitionsForTask.get(id));
+                }
+
+                partitionsByHostState.put(hostInfo, topicPartitions);
+            }
+        }
 
-        final Map<HostInfo, Set<TopicPartition>> endPointMap = new HashMap<>();
-        for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
-            UUID processId = entry.getKey();
-            Set<String> consumers = entry.getValue();
-            ClientState<TaskId> state = states.get(processId);
+        // within the client, distribute tasks to its owned consumers
+        Map<String, Assignment> assignment = new HashMap<>();
+        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+            Set<String> consumers = entry.getValue().consumers;
+            ClientState<TaskId> state = entry.getValue().state;
 
             ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
             final int numActiveTasks = state.activeTasks.size();
-            for (TaskId taskId : state.activeTasks) {
-                taskIds.add(taskId);
-            }
-            for (TaskId id : state.assignedTasks) {
-                if (!state.activeTasks.contains(id))
-                    taskIds.add(id);
-            }
 
-            final int numConsumers = consumers.size();
+            taskIds.addAll(state.activeTasks);
+            taskIds.addAll(state.standbyTasks);
 
+            final int numConsumers = consumers.size();
 
             int i = 0;
             for (String consumer : consumers) {
@@ -448,100 +517,58 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 for (AssignedPartition partition : assignedPartitions) {
                     active.add(partition.taskId);
                     activePartitions.add(partition.partition);
-                    HostInfo hostInfo = consumerEndPointMap.get(processId);
-                    if (hostInfo != null) {
-                        if (!endPointMap.containsKey(hostInfo)) {
-                            endPointMap.put(hostInfo, new HashSet<TopicPartition>());
-                        }
-                        final Set<TopicPartition> topicPartitions = endPointMap.get(hostInfo);
-                        topicPartitions.add(partition.partition);
-                    }
                 }
 
-
-                assignmentSuppliers.add(new AssignmentSupplier(consumer,
-                                                               active,
-                                                               standby,
-                                                               endPointMap,
-                                                               activePartitions));
+                // finally, encode the assignment before sending back to coordinator
+                assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
 
                 i++;
             }
         }
 
-        // if ZK is specified, validate the internal topics again
-        prepareTopic(internalSourceTopicToTaskIds,  /* compactTopic */ true);
-        // change log topics should be compacted
-        prepareTopic(stateChangelogTopicToTaskIds,  /* compactTopic */ true);
-
-        Map<String, Assignment> assignment = new HashMap<>();
-        for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) {
-            assignment.put(assignmentSupplier.consumer, assignmentSupplier.get());
-        }
         return assignment;
     }
 
-    class AssignmentSupplier {
-        private final String consumer;
-        private final List<TaskId> active;
-        private final Map<TaskId, Set<TopicPartition>> standby;
-        private final Map<HostInfo, Set<TopicPartition>> endPointMap;
-        private final List<TopicPartition> activePartitions;
-
-        AssignmentSupplier(final String consumer,
-                           final List<TaskId> active,
-                           final Map<TaskId, Set<TopicPartition>> standby,
-                           final Map<HostInfo, Set<TopicPartition>> endPointMap,
-                           final List<TopicPartition> activePartitions) {
-            this.consumer = consumer;
-            this.active = active;
-            this.standby = standby;
-            this.endPointMap = endPointMap;
-            this.activePartitions = activePartitions;
-        }
-
-        Assignment get() {
-            return new Assignment(activePartitions, new AssignmentInfo(active,
-                                                                       standby,
-                                                                       endPointMap).encode());
-        }
-    }
-
     /**
      * @throws TaskAssignmentException if there is no task id for one of the partitions specified
      */
     @Override
     public void onAssignment(Assignment assignment) {
         List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
-
         Collections.sort(partitions, PARTITION_COMPARATOR);
 
         AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
         this.standbyTasks = info.standbyTasks;
+        this.activeTasks = new HashMap<>();
+
+        // the number of assigned partitions should be the same as number of active tasks, which
+        // could be duplicated if one task has more than one assigned partitions
+        if (partitions.size() != info.activeTasks.size()) {
+            throw new TaskAssignmentException(
+                    String.format("stream-thread [%s] Number of assigned partitions %d is not equal to the number of active taskIds %d" +
+                            ", assignmentInfo=%s", streamThread.getName(), partitions.size(), info.activeTasks.size(), info.toString())
+            );
+        }
 
-        Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
-        Iterator<TaskId> iter = info.activeTasks.iterator();
-        for (TopicPartition partition : partitions) {
-            Set<TaskId> taskIds = partitionToTaskIds.get(partition);
-            if (taskIds == null) {
-                taskIds = new HashSet<>();
-                partitionToTaskIds.put(partition, taskIds);
-            }
+        for (int i = 0; i < partitions.size(); i++) {
+            TopicPartition partition = partitions.get(i);
+            TaskId id = info.activeTasks.get(i);
 
-            if (iter.hasNext()) {
-                taskIds.add(iter.next());
-            } else {
-                TaskAssignmentException ex = new TaskAssignmentException(
-                        String.format("stream-thread [%s] failed to find a task id for the partition=%s" +
-                        ", partitions=%d, assignmentInfo=%s", streamThread.getName(), partition.toString(), partitions.size(), info.toString())
-                );
-                log.error(ex.getMessage(), ex);
-                throw ex;
+            Set<TopicPartition> assignedPartitions = activeTasks.get(id);
+            if (assignedPartitions == null) {
+                assignedPartitions = new HashSet<>();
+                activeTasks.put(id, assignedPartitions);
             }
+            assignedPartitions.add(partition);
+        }
+
+        // only need to update the host partitions map if it is not leader
+        if (this.partitionsByHostState == null) {
+            this.partitionsByHostState = info.partitionsByHost;
         }
-        this.partitionToTaskIds = partitionToTaskIds;
-        this.partitionsByHostState = info.partitionsByHostState;
-        // only need to build when not coordinator
+
+        // only need to build if it is not leader
         if (metadataWithInternalTopics == null) {
             final Collection<Set<TopicPartition>> values = partitionsByHostState.values();
             final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
@@ -558,92 +585,129 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
     }
 
-    public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
-        if (partitionsByHostState == null) {
-            return Collections.emptyMap();
-        }
-        return Collections.unmodifiableMap(partitionsByHostState);
-    }
+    /**
+     * Internal helper function that creates a Kafka topic
+     *
+     * @param topicPartitions Map that contains the topic names to be created with the number of partitions
+     */
+    private void prepareTopic(Map<String, InternalTopicMetadata> topicPartitions) {
+        log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
 
-    public Cluster clusterMetadata() {
-        if (metadataWithInternalTopics == null) {
-            return Cluster.empty();
-        }
-        return metadataWithInternalTopics;
-    }
+        // if ZK is specified, prepare the internal source topic before calling partition grouper
+        if (internalTopicManager != null) {
+            for (Map.Entry<String, InternalTopicMetadata> entry : topicPartitions.entrySet()) {
+                InternalTopicConfig topic = entry.getValue().config;
+                Integer numPartitions = entry.getValue().numPartitions;
 
-    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Collection<InternalTopicConfig>> internalTopicGroups, Cluster metadata) {
-        Map<String, InternalTopicConfig> internalTopics = new HashMap<>();
-        for (Collection<InternalTopicConfig> topics : internalTopicGroups.values()) {
-            for (InternalTopicConfig topic : topics) {
-                internalTopics.put(topic.name(), topic);
+                if (numPartitions < 0)
+                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
+
+                internalTopicManager.makeReady(topic, numPartitions);
+
+                // wait until the topic metadata has been propagated to all brokers
+                List<PartitionInfo> partitions;
+                do {
+                    partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
+                } while (partitions == null || partitions.size() != numPartitions);
+            }
+        } else {
+            List<String> missingTopics = new ArrayList<>();
+            for (String topic : topicPartitions.keySet()) {
+                List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                if (partitions == null) {
+                    missingTopics.add(topic);
+                }
+            }
+
+            if (!missingTopics.isEmpty()) {
+                log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
+                        streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
             }
         }
 
+        log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
+    }
+
+    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
+                                      Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+                                      Cluster metadata) {
         for (Set<String> copartitionGroup : copartitionGroups) {
-            ensureCopartitioning(copartitionGroup, internalTopics, metadata);
+            ensureCopartitioning(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
         }
     }
 
-    private void ensureCopartitioning(Set<String> copartitionGroup, Map<String, InternalTopicConfig> internalTopics, Cluster metadata) {
+    private void ensureCopartitioning(Set<String> copartitionGroup,
+                                      Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+                                      Cluster metadata) {
         int numPartitions = -1;
 
         for (String topic : copartitionGroup) {
-            if (!internalTopics.containsKey(topic)) {
-                List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
+            if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
+                Integer partitions = metadata.partitionCountForTopic(topic);
 
-                if (infos == null)
-                    throw new TopologyBuilderException(String.format("stream-thread [%s] External source topic not found: %s", streamThread.getName(), topic));
+                if (partitions == null)
+                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", streamThread.getName(), topic));
 
                 if (numPartitions == -1) {
-                    numPartitions = infos.size();
-                } else if (numPartitions != infos.size()) {
+                    numPartitions = partitions;
+                } else if (numPartitions != partitions) {
                     String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                     Arrays.sort(topics);
-                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not copartitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
+                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
                 }
             }
         }
 
+        // if all topics for this co-partition group is repartition topics,
+        // then set the number of partitions to be the maximum of the number of partitions.
         if (numPartitions == -1) {
-            for (InternalTopicConfig topic : internalTopics.values()) {
-                if (copartitionGroup.contains(topic.name())) {
-                    Integer partitions = metadata.partitionCountForTopic(topic.name());
-                    if (partitions != null && partitions > numPartitions) {
+            for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
+                if (copartitionGroup.contains(entry.getKey())) {
+                    int partitions = entry.getValue().numPartitions;
+                    if (partitions > numPartitions) {
                         numPartitions = partitions;
                     }
                 }
             }
         }
-        // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds
-        for (InternalTopicConfig topic : internalTopics.values()) {
-            if (copartitionGroup.contains(topic.name())) {
-                internalSourceTopicToTaskIds
-                    .put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
+
+        // enforce co-partitioning restrictions to repartition topics by updating their number of partitions
+        for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
+            if (copartitionGroup.contains(entry.getKey())) {
+                entry.getValue().numPartitions = numPartitions;
             }
         }
     }
 
-    /* For Test Only */
-    public Set<TaskId> tasksForState(String stateName) {
-        final String changeLogName = ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName);
-        for (InternalTopicConfig internalTopicConfig : stateChangelogTopicToTaskIds.keySet()) {
-            if (internalTopicConfig.name().equals(changeLogName)) {
-                return stateChangelogTopicToTaskIds.get(internalTopicConfig);
-            }
+    Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
+        if (partitionsByHostState == null) {
+            return Collections.emptyMap();
         }
-        return Collections.emptySet();
+        return Collections.unmodifiableMap(partitionsByHostState);
+    }
+
+    Cluster clusterMetadata() {
+        if (metadataWithInternalTopics == null) {
+            return Cluster.empty();
+        }
+        return metadataWithInternalTopics;
     }
 
-    public Set<TaskId> tasksForPartition(TopicPartition partition) {
-        return partitionToTaskIds.get(partition);
+    Map<TaskId, Set<TopicPartition>> activeTasks() {
+        if (activeTasks == null) {
+            return Collections.emptyMap();
+        }
+        return Collections.unmodifiableMap(activeTasks);
     }
 
-    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-        return standbyTasks;
+    Map<TaskId, Set<TopicPartition>> standbyTasks() {
+        if (standbyTasks == null) {
+            return Collections.emptyMap();
+        }
+        return Collections.unmodifiableMap(standbyTasks);
     }
 
-    public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
     }
 
@@ -651,11 +715,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      * Used to capture subscribed topic via Patterns discovered during the
      * partition assignment process.
      */
-    public static  class SubscriptionUpdates {
+    public static class SubscriptionUpdates {
 
         private final Set<String> updatedTopicSubscriptions = new HashSet<>();
 
-
         private  void updateTopics(Collection<String> topicNames) {
             updatedTopicSubscriptions.clear();
             updatedTopicSubscriptions.addAll(topicNames);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d7bb98c..7a04339 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -111,7 +111,6 @@ public class StreamThread extends Thread {
     private boolean processStandbyRecords = false;
     private AtomicBoolean initialized = new AtomicBoolean(false);
 
-    private final long cacheSizeBytes;
     private ThreadCache cache;
 
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@@ -184,7 +183,7 @@ public class StreamThread extends Thread {
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
             log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadName);
         }
-        this.cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
+        long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
             config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
         this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.sensors);
 
@@ -637,34 +636,24 @@ public class StreamThread extends Thread {
         if (partitionAssignor == null)
             throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
 
-        HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
-
-        for (TopicPartition partition : assignment) {
-            Set<TaskId> taskIds = partitionAssignor.tasksForPartition(partition);
-            for (TaskId taskId : taskIds) {
-                Set<TopicPartition> partitions = partitionsForTask.get(taskId);
-                if (partitions == null) {
-                    partitions = new HashSet<>();
-                    partitionsForTask.put(taskId, partitions);
-                }
-                partitions.add(partition);
-            }
-        }
-
         // create the active tasks
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.activeTasks().entrySet()) {
             TaskId taskId = entry.getKey();
             Set<TopicPartition> partitions = entry.getValue();
 
-            try {
-                StreamTask task = createStreamTask(taskId, partitions);
-                activeTasks.put(taskId, task);
+            if (assignment.containsAll(partitions)) {
+                try {
+                    StreamTask task = createStreamTask(taskId, partitions);
+                    activeTasks.put(taskId, task);
 
-                for (TopicPartition partition : partitions)
-                    activeTasksByPartition.put(partition, task);
-            } catch (StreamsException e) {
-                log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
-                throw e;
+                    for (TopicPartition partition : partitions)
+                        activeTasksByPartition.put(partition, task);
+                } catch (StreamsException e) {
+                    log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
+                    throw e;
+                }
+            } else {
+                log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", logPrefix, taskId, partitions, assignment);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 6569f85..ddbd67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -41,7 +41,7 @@ public class AssignmentInfo {
 
     private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
     /**
-     * A new field was added, partitionsByHostState. CURRENT_VERSION
+     * A new field was added, partitionsByHost. CURRENT_VERSION
      * is required so we can decode the previous version. For example, this may occur
      * during a rolling upgrade
      */
@@ -49,7 +49,7 @@ public class AssignmentInfo {
     public final int version;
     public final List<TaskId> activeTasks; // each element corresponds to a partition
     public final Map<TaskId, Set<TopicPartition>> standbyTasks;
-    public final Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+    public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
 
     public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
                           Map<HostInfo, Set<TopicPartition>> hostState) {
@@ -61,7 +61,7 @@ public class AssignmentInfo {
         this.version = version;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
-        this.partitionsByHostState = hostState;
+        this.partitionsByHost = hostState;
     }
 
     /**
@@ -89,8 +89,8 @@ public class AssignmentInfo {
                 Set<TopicPartition> partitions = entry.getValue();
                 writeTopicPartitions(out, partitions);
             }
-            out.writeInt(partitionsByHostState.size());
-            for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHostState
+            out.writeInt(partitionsByHost.size());
+            for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost
                     .entrySet()) {
                 final HostInfo hostInfo = entry.getKey();
                 out.writeUTF(hostInfo.host());
@@ -174,7 +174,7 @@ public class AssignmentInfo {
 
     @Override
     public int hashCode() {
-        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHostState.hashCode();
+        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
     }
 
     @Override
@@ -184,7 +184,7 @@ public class AssignmentInfo {
             return this.version == other.version &&
                     this.activeTasks.equals(other.activeTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
-                    this.partitionsByHostState.equals(other.partitionsByHostState);
+                    this.partitionsByHost.equals(other.partitionsByHost);
         } else {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index b59af86..0746cab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -22,11 +22,12 @@ import java.util.Set;
 
 public class ClientState<T> {
 
-    public final static double COST_ACTIVE = 0.1;
-    public final static double COST_STANDBY  = 0.2;
-    public final static double COST_LOAD = 0.5;
+    final static double COST_ACTIVE = 0.1;
+    final static double COST_STANDBY  = 0.2;
+    final static double COST_LOAD = 0.5;
 
     public final Set<T> activeTasks;
+    public final Set<T> standbyTasks;
     public final Set<T> assignedTasks;
     public final Set<T> prevActiveTasks;
     public final Set<T> prevAssignedTasks;
@@ -39,11 +40,12 @@ public class ClientState<T> {
     }
 
     public ClientState(double capacity) {
-        this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
+        this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
     }
 
-    private ClientState(Set<T> activeTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
+    private ClientState(Set<T> activeTasks, Set<T> standbyTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
         this.activeTasks = activeTasks;
+        this.standbyTasks = standbyTasks;
         this.assignedTasks = assignedTasks;
         this.prevActiveTasks = prevActiveTasks;
         this.prevAssignedTasks = prevAssignedTasks;
@@ -52,13 +54,15 @@ public class ClientState<T> {
     }
 
     public ClientState<T> copy() {
-        return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(assignedTasks),
+        return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(standbyTasks), new HashSet<>(assignedTasks),
                 new HashSet<>(prevActiveTasks), new HashSet<>(prevAssignedTasks), capacity);
     }
 
     public void assign(T taskId, boolean active) {
         if (active)
             activeTasks.add(taskId);
+        else
+            standbyTasks.add(taskId);
 
         assignedTasks.add(taskId);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index fadb43f..e807c4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
@@ -33,24 +32,20 @@ public class TaskAssignor<C, T extends Comparable<T>> {
 
     private static final Logger log = LoggerFactory.getLogger(TaskAssignor.class);
 
-    public static <C, T extends Comparable<T>> Map<C, ClientState<T>> assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas, String streamThreadId) {
+    public static <C, T extends Comparable<T>> void assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas) {
         long seed = 0L;
         for (C client : states.keySet()) {
             seed += client.hashCode();
         }
 
         TaskAssignor<C, T> assignor = new TaskAssignor<>(states, tasks, seed);
-        log.info("stream-thread [{}] Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " +
-            "prevClientsUnchanged: {}, tasks: {}, replicas: {}",
-            streamThreadId, states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged,
-            tasks, numStandbyReplicas);
 
+        // assign active tasks
         assignor.assignTasks();
+
+        // assign standby tasks
         if (numStandbyReplicas > 0)
             assignor.assignStandbyTasks(numStandbyReplicas);
-
-        log.info("stream-thread [{}] Assigned with: {}", streamThreadId, assignor.states);
-        return assignor.states;
     }
 
     private final Random rand;
@@ -63,36 +58,38 @@ public class TaskAssignor<C, T extends Comparable<T>> {
 
     private TaskAssignor(Map<C, ClientState<T>> states, Set<T> tasks, long randomSeed) {
         this.rand = new Random(randomSeed);
-        this.states = new HashMap<>();
+        this.tasks = new ArrayList<>(tasks);
+        this.states = states;
+
         int avgNumTasks = tasks.size() / states.size();
         Set<T> existingTasks = new HashSet<>();
         for (Map.Entry<C, ClientState<T>> entry : states.entrySet()) {
-            this.states.put(entry.getKey(), entry.getValue().copy());
             Set<T> oldTasks = entry.getValue().prevAssignedTasks;
+
             // make sure the previous assignment is balanced
             prevAssignmentBalanced = prevAssignmentBalanced &&
                 oldTasks.size() < 2 * avgNumTasks && oldTasks.size() > avgNumTasks / 2;
+
+            // make sure there are no duplicates
             for (T task : oldTasks) {
-                // Make sure there is no duplicates
                 prevClientsUnchanged = prevClientsUnchanged && !existingTasks.contains(task);
             }
             existingTasks.addAll(oldTasks);
         }
-        // Make sure the existing assignment didn't miss out any task
-        prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
 
-        this.tasks = new ArrayList<>(tasks);
+        // make sure the existing assignment didn't miss out any task
+        prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
 
         int numTasks = tasks.size();
         this.maxNumTaskPairs = numTasks * (numTasks - 1) / 2;
         this.taskPairs = new HashSet<>(this.maxNumTaskPairs);
     }
 
-    public void assignTasks() {
+    private void assignTasks() {
         assignTasks(true);
     }
 
-    public void assignStandbyTasks(int numStandbyReplicas) {
+    private void assignStandbyTasks(int numStandbyReplicas) {
         int numReplicas = Math.min(numStandbyReplicas, states.size() - 1);
         for (int i = 0; i < numReplicas; i++) {
             assignTasks(false);
@@ -195,10 +192,10 @@ public class TaskAssignor<C, T extends Comparable<T>> {
     }
 
     private static class TaskPair<T> {
-        public final T task1;
-        public final T task2;
+        final T task1;
+        final T task2;
 
-        public TaskPair(T task1, T task2) {
+        TaskPair(T task1, T task2) {
             this.task1 = task1;
             this.task2 = task2;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 37a15e1..ce75d91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -74,7 +74,7 @@ public class HostInfo {
     @Override
     public String toString() {
         return "HostInfo{" +
-                "host='" + host + '\'' +
+                "host=\'" + host + '\'' +
                 ", port=" + port +
                 '}';
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 41f9ae2..83e3d10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -120,14 +120,4 @@ public class StoreChangeLogger<K, V> {
         this.removed.clear();
         this.dirty.clear();
     }
-
-    // this is for test only
-    public int numDirty() {
-        return this.dirty.size();
-    }
-
-    // this is for test only
-    public int numRemoved() {
-        return this.removed.size();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index a5fb076..03df4cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -85,7 +85,6 @@ public class FanoutIntegrationTest {
         CLUSTER.createTopic(OUTPUT_TOPIC_C);
     }
 
-
     @Parameter
     public long cacheSizeBytes;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index e5e334c..8c96ecb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -206,7 +206,7 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldCantHaveNullPredicate() throws Exception {
-        testStream.branch(null);
+        testStream.branch((Predicate) null);
     }
 
     @Test(expected = NullPointerException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index afa1033..49dcbd0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -40,6 +40,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
 import java.io.File;
 import java.io.IOException;
 
@@ -337,7 +338,7 @@ public class KTableImplTest {
     }
 
     @Test
-    public void testRepartition() throws IOException {
+    public void testRepartition() throws Exception {
         String topic1 = "topic1";
         String storeName1 = "storeName1";
 
@@ -367,10 +368,15 @@ public class KTableImplTest {
         assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
         assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
 
-        assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
-        assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
-        assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
-        assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
+        Field valSerializerField  = ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
+        Field valDeserializerField  = ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+        valSerializerField.setAccessible(true);
+        valDeserializerField.setAccessible(true);
+
+        assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000003"))).inner());
+        assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
+        assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000007"))).inner());
+        assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
     }
 
     @Test(expected = NullPointerException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index d260937..c402c9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -156,7 +156,7 @@ public class TopologyBuilderTest {
         builder.addSource("source-3", "topic-3");
         builder.addInternalTopic("topic-3");
 
-        Set<String> expected = new HashSet<String>();
+        Set<String> expected = new HashSet<>();
         expected.add("topic-1");
         expected.add("topic-2");
         expected.add("X-topic-3");
@@ -516,7 +516,7 @@ public class TopologyBuilderTest {
         builder.addInternalTopic("foo");
         builder.addSource("source", "foo");
         final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.interSourceTopics.get("appId-foo");
+        final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Properties properties = topicConfig.toProperties(0);
         assertEquals("appId-foo", topicConfig.name());
         assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));


Mime
View raw message