KAFKA-4117: Stream partitionassignro cleanup
1. Create a new `ClientMetadata` to collapse `Set<String> consumerMemberIds`, `ClientState<TaskId> state`, and `HostInfo hostInfo`.
2. Stop reusing `stateChangelogTopicToTaskIds` and `internalSourceTopicToTaskIds` to access the (sub-)topology's internal repartition and changelog topics for clarity; also use the source topics num.partitions to set the num.partitions for repartition topics, and clarify to NOT have cycles since otherwise the while loop will fail.
3. `ensure-copartition` at the end to modify the number of partitions for repartition topics if necessary to be equal to other co-partition topics.
4. Refactor `ClientState` as well and update the logic of `TaskAssignor` for clarity as well.
5. Change default `clientId` from `applicationId-suffix` to `applicationId-processId` where `processId` is an UUID to avoid conflicts of clientIds that are from different JVMs, and hence conflicts in metrics.
6. Enforce `assignment` partitions to have the same size, and hence 1-1 mapping to `activeTask` taskIds.
7. Remove the `AssignmentSupplier` class by always construct the `partitionsByHostState` before assigning tasks to consumers within a client.
8. Remove all unnecessary member variables in `StreamPartitionAssignor`.
9. Some other minor fixes on unit tests, e.g. remove `test only` functions with java field reflection.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Xavier Léauté, Matthias J. Sax, Eno Thereska, Jason Gustafson
Closes #2012 from guozhangwang/K4117-stream-partitionassignro-cleanup
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a4ab9d02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a4ab9d02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a4ab9d02
Branch: refs/heads/trunk
Commit: a4ab9d02a22e77f0ebca0450e608898d83d4fe18
Parents: 0dd9607
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Sat Oct 29 10:48:17 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Oct 29 10:48:17 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/common/Cluster.java | 20 +-
.../org/apache/kafka/streams/KafkaStreams.java | 4 +-
.../streams/processor/TopologyBuilder.java | 8 +-
.../internals/InternalTopicConfig.java | 3 +-
.../streams/processor/internals/SinkNode.java | 5 -
.../streams/processor/internals/SourceNode.java | 5 -
.../internals/StreamPartitionAssignor.java | 717 ++++++++++---------
.../processor/internals/StreamThread.java | 39 +-
.../internals/assignment/AssignmentInfo.java | 14 +-
.../internals/assignment/ClientState.java | 16 +-
.../internals/assignment/TaskAssignor.java | 37 +-
.../apache/kafka/streams/state/HostInfo.java | 2 +-
.../state/internals/StoreChangeLogger.java | 10 -
.../integration/FanoutIntegrationTest.java | 1 -
.../kstream/internals/KStreamImplTest.java | 2 +-
.../kstream/internals/KTableImplTest.java | 16 +-
.../streams/processor/TopologyBuilderTest.java | 4 +-
.../internals/StreamPartitionAssignorTest.java | 148 ++--
.../internals/StreamsMetadataStateTest.java | 2 +-
.../assignment/AssignmentInfoTest.java | 2 +-
.../internals/assignment/TaskAssignorTest.java | 115 +--
.../state/internals/StoreChangeLoggerTest.java | 21 +-
22 files changed, 642 insertions(+), 549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 471ae26..3c6475d 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -207,6 +207,16 @@ public final class Cluster {
}
/**
+ * Get the number of partitions for the given topic
+ * @param topic The topic to get the number of partitions for
+ * @return The number of partitions or null if there is no corresponding metadata
+ */
+ public Integer partitionCountForTopic(String topic) {
+ List<PartitionInfo> partitions = this.partitionsByTopic.get(topic);
+ return partitions == null ? null : partitions.size();
+ }
+
+ /**
* Get the list of available partitions for this topic
* @param topic The topic name
* @return A list of partitions
@@ -225,16 +235,6 @@ public final class Cluster {
}
/**
- * Get the number of partitions for the given topic
- * @param topic The topic to get the number of partitions for
- * @return The number of partitions or null if there is no corresponding metadata
- */
- public Integer partitionCountForTopic(String topic) {
- List<PartitionInfo> partitionInfos = this.partitionsByTopic.get(topic);
- return partitionInfos == null ? null : partitionInfos.size();
- }
-
- /**
* Get all topics.
* @return a set of all topics
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2333db7..e120b31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -47,7 +47,6 @@ import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
@@ -91,7 +90,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public class KafkaStreams {
private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
- private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.streams";
// container states
@@ -156,7 +154,7 @@ public class KafkaStreams {
String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
- clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+ clientId = applicationId + "-" + processId;
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f1f63..ecac8c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -227,14 +227,14 @@ public class TopologyBuilder {
public static class TopicsInfo {
public Set<String> sinkTopics;
public Set<String> sourceTopics;
- public Map<String, InternalTopicConfig> interSourceTopics;
public Map<String, InternalTopicConfig> stateChangelogTopics;
+ public Map<String, InternalTopicConfig> repartitionSourceTopics;
- public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> interSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
+ public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
this.sinkTopics = sinkTopics;
this.sourceTopics = sourceTopics;
- this.interSourceTopics = interSourceTopics;
this.stateChangelogTopics = stateChangelogTopics;
+ this.repartitionSourceTopics = repartitionSourceTopics;
}
@Override
@@ -258,7 +258,7 @@ public class TopologyBuilder {
return "TopicsInfo{" +
"sinkTopics=" + sinkTopics +
", sourceTopics=" + sourceTopics +
- ", interSourceTopics=" + interSourceTopics +
+ ", repartitionSourceTopics=" + repartitionSourceTopics +
", stateChangelogTopics=" + stateChangelogTopics +
'}';
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index 45016c8..c23f134 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -30,9 +30,10 @@ public class InternalTopicConfig {
private final String name;
private final Map<String, String> logConfig;
- private Long retentionMs;
private final Set<CleanupPolicy> cleanupPolicies;
+ private Long retentionMs;
+
public InternalTopicConfig(final String name, final Set<CleanupPolicy> defaultCleanupPolicies, final Map<String, String> logConfig) {
Objects.requireNonNull(name, "name can't be null");
if (defaultCleanupPolicies.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2b5692d..4e56f61 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -77,11 +77,6 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
// do nothing
}
- // for test only
- public Serializer<V> valueSerializer() {
- return valSerializer;
- }
-
/**
* @return a string representation of this node, useful for debugging.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 4bc3a53..e17509b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -71,11 +71,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
// do nothing
}
- // for test only
- public Deserializer<V> valueDeserializer() {
- return valDeserializer;
- }
-
/**
* @return a string representation of this node, useful for debugging.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index dcba543..009ba1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -46,37 +46,92 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
- private String userEndPointConfig;
- private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
- private Cluster metadataWithInternalTopics;
-
private static class AssignedPartition implements Comparable<AssignedPartition> {
public final TaskId taskId;
public final TopicPartition partition;
- public AssignedPartition(TaskId taskId, TopicPartition partition) {
+ AssignedPartition(final TaskId taskId, final TopicPartition partition) {
this.taskId = taskId;
this.partition = partition;
}
@Override
- public int compareTo(AssignedPartition that) {
+ public int compareTo(final AssignedPartition that) {
return PARTITION_COMPARATOR.compare(this.partition, that.partition);
}
}
+ private static class ClientMetadata {
+ final HostInfo hostInfo;
+ final Set<String> consumers;
+ final ClientState<TaskId> state;
+
+ ClientMetadata(final String endPoint) {
+
+ // get the host info if possible
+ if (endPoint != null) {
+ final String host = getHost(endPoint);
+ final Integer port = getPort(endPoint);
+
+ if (host == null || port == null)
+ throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
+
+ hostInfo = new HostInfo(host, port);
+ } else {
+ hostInfo = null;
+ }
+
+ // initialize the consumer memberIds
+ consumers = new HashSet<>();
+
+ // initialize the client state
+ state = new ClientState<>();
+ }
+
+ void addConsumer(final String consumerMemberId, final SubscriptionInfo info) {
+
+ consumers.add(consumerMemberId);
+
+ state.prevActiveTasks.addAll(info.prevTasks);
+ state.prevAssignedTasks.addAll(info.prevTasks);
+ state.prevAssignedTasks.addAll(info.standbyTasks);
+ state.capacity = state.capacity + 1d;
+ }
+
+ @Override
+ public String toString() {
+ return "ClientMetadata{" +
+ "hostInfo=" + hostInfo +
+ ", consumers=" + consumers +
+ ", state=" + state +
+ '}';
+ }
+ }
+
+ private static class InternalTopicMetadata {
+ public final InternalTopicConfig config;
+
+ public int numPartitions;
+
+ InternalTopicMetadata(final InternalTopicConfig config) {
+ this.config = config;
+ this.numPartitions = -1;
+ }
+ }
+
private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
@Override
public int compare(TopicPartition p1, TopicPartition p2) {
@@ -92,12 +147,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private StreamThread streamThread;
+ private String userEndPoint;
private int numStandbyReplicas;
- private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
- private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
- private Map<InternalTopicConfig, Set<TaskId>> stateChangelogTopicToTaskIds;
- private Map<InternalTopicConfig, Set<TaskId>> internalSourceTopicToTaskIds;
+
+ private Cluster metadataWithInternalTopics;
+ private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+
private Map<TaskId, Set<TopicPartition>> standbyTasks;
+ private Map<TaskId, Set<TopicPartition>> activeTasks;
private InternalTopicManager internalTopicManager;
@@ -111,7 +168,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -130,21 +186,20 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (userEndPoint != null && !userEndPoint.isEmpty()) {
- final String[] hostPort = userEndPoint.split(":");
- if (hostPort.length != 2) {
- throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
- " but received %s",
- streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
- } else {
- try {
- Integer.valueOf(hostPort[1]);
- this.userEndPointConfig = userEndPoint;
- } catch (NumberFormatException nfe) {
- throw new ConfigException(String.format("stream-thread [%s] Invalid port %s supplied in %s for config %s",
- streamThread.getName(), hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
- }
+ try {
+ String host = getHost(userEndPoint);
+ Integer port = getPort(userEndPoint);
+
+ if (host == null || port == null)
+ throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
+ " but received %s",
+ streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+ } catch (NumberFormatException nfe) {
+ throw new ConfigException(String.format("stream-thread [%s] Invalid port supplied in %s for config %s",
+ streamThread.getName(), userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
}
+ this.userEndPoint = userEndPoint;
}
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
@@ -174,7 +229,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
Set<TaskId> prevTasks = streamThread.prevTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
standbyTasks.removeAll(prevTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
+ SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPoint);
if (streamThread.builder.sourceTopicPattern() != null) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
@@ -187,238 +242,252 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
return new Subscription(new ArrayList<>(topics), data.encode());
}
- /**
- * Internal helper function that creates a Kafka topic
- * @param topicToTaskIds Map that contains the topic names to be created
- * @param postPartitionPhase If true, the computation for calculating the number of partitions
- * is slightly different. Set to true after the initial topic-to-partition
- * assignment.
- * @return
+ /*
+ * This assigns tasks to consumer clients in the following steps.
+ *
+ * 0. check all repartition source topics and use internal topic manager to make sure
+ * they have been created with the right number of partitions.
+ *
+ * 1. using user customized partition grouper to generate tasks along with their
+ * assigned partitions; also make sure that the task's corresponding changelog topics
+ * have been created with the right number of partitions.
+ *
+ * 2. using TaskAssignor to assign tasks to consumer clients.
+ * - Assign a task to a client which was running it previously.
+ * If there is no such client, assign a task to a client which has its valid local state.
+ * - A client may have more than one stream threads.
+ * The assignor tries to assign tasks to a client proportionally to the number of threads.
+ * - We try not to assign the same set of tasks to two different clients
+ * We do the assignment in one-pass. The result may not satisfy above all.
+ *
+ * 3. within each client, tasks are assigned to consumer clients in round-robin manner.
*/
- private Map<TopicPartition, PartitionInfo> prepareTopic(Map<InternalTopicConfig, Set<TaskId>> topicToTaskIds,
- boolean postPartitionPhase) {
- Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>();
- // if ZK is specified, prepare the internal source topic before calling partition grouper
- if (internalTopicManager != null) {
- log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
-
- for (Map.Entry<InternalTopicConfig, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
- InternalTopicConfig topic = entry.getKey();
- int numPartitions = 0;
- if (postPartitionPhase) {
- // the expected number of partitions is the max value of TaskId.partition + 1
- for (TaskId task : entry.getValue()) {
- if (numPartitions < task.partition + 1)
- numPartitions = task.partition + 1;
- }
- } else {
- // should have size 1 only
- numPartitions = -1;
- for (TaskId task : entry.getValue()) {
- numPartitions = task.partition;
- }
- }
-
- internalTopicManager.makeReady(topic, numPartitions);
-
- // wait until the topic metadata has been propagated to all brokers
- List<PartitionInfo> partitions;
- do {
- partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
- } while (partitions == null || partitions.size() != numPartitions);
-
- for (PartitionInfo partition : partitions)
- partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition);
- }
-
- log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
- } else {
- List<String> missingTopics = new ArrayList<>();
- for (InternalTopicConfig topic : topicToTaskIds.keySet()) {
- List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
- if (partitions == null) {
- missingTopics.add(topic.name());
- }
- }
- if (!missingTopics.isEmpty()) {
- log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
- streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
-
- }
- }
-
- return partitionInfos;
- }
-
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
- // This assigns tasks to consumer clients in two steps.
- // 1. using TaskAssignor to assign tasks to consumer clients.
- // - Assign a task to a client which was running it previously.
- // If there is no such client, assign a task to a client which has its valid local state.
- // - A client may have more than one stream threads.
- // The assignor tries to assign tasks to a client proportionally to the number of threads.
- // - We try not to assign the same set of tasks to two different clients
- // We do the assignment in one-pass. The result may not satisfy above all.
- // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
- Map<UUID, Set<String>> consumersByClient = new HashMap<>();
- Map<UUID, ClientState<TaskId>> states = new HashMap<>();
- Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
- // decode subscription info
+
+ // construct the client metadata from the decoded subscription info
+ Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
-
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
- if (info.userEndPoint != null) {
- final String[] hostPort = info.userEndPoint.split(":");
- consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
- }
- Set<String> consumers = consumersByClient.get(info.processId);
- if (consumers == null) {
- consumers = new HashSet<>();
- consumersByClient.put(info.processId, consumers);
- }
- consumers.add(consumerId);
- ClientState<TaskId> state = states.get(info.processId);
- if (state == null) {
- state = new ClientState<>();
- states.put(info.processId, state);
+ // create the new client metadata if necessary
+ ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
+
+ if (clientMetadata == null) {
+ clientMetadata = new ClientMetadata(info.userEndPoint);
+ clientsMetadata.put(info.processId, clientMetadata);
}
- state.prevActiveTasks.addAll(info.prevTasks);
- state.prevAssignedTasks.addAll(info.prevTasks);
- state.prevAssignedTasks.addAll(info.standbyTasks);
- state.capacity = state.capacity + 1d;
+ // add the consumer to the client
+ clientMetadata.addConsumer(consumerId, info);
}
+ log.info("stream-thread [{}] Constructed client metadata {} from the member subscriptions.", streamThread.getName(), clientsMetadata);
- this.topicGroups = streamThread.builder.topicGroups();
+ // ---------------- Step Zero ---------------- //
- // ensure the co-partitioning topics within the group have the same number of partitions,
- // and enforce the number of partitions for those internal topics.
- internalSourceTopicToTaskIds = new HashMap<>();
- Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
- Map<Integer, Collection<InternalTopicConfig>> internalSourceTopicGroups = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
- sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
- internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics.values());
+ // parse the topology to determine the repartition source topics,
+ // making sure they are created with the number of partitions as
+ // the maximum of the depending sub-topologies source topics' number of partitions
+ Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
+
+ Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
+ for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
+ repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
+ }
}
+ boolean numPartitionsNeeded;
+ do {
+ numPartitionsNeeded = false;
+
+ for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+ int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
+
+ // try set the number of partitions for this repartition topic if it is not set yet
+ if (numPartitions == -1) {
+ for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+ Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
+
+ if (otherSinkTopics.contains(topicName)) {
+ // if this topic is one of the sink topics of this topology,
+ // use the maximum of all its source topic partitions as the number of partitions
+ for (String sourceTopicName : otherTopicsInfo.sourceTopics) {
+ Integer numPartitionsCandidate;
+ // It is possible the sourceTopic is another internal topic, i.e,
+ // map().join().join(map())
+ if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
+ numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions;
+ } else {
+ numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
+ }
- // for all internal source topics
- // set the number of partitions to the maximum of the depending sub-topologies source topics
- Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>();
- Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
- Map<String, InternalTopicConfig> internalTopics = entry.getValue().interSourceTopics;
- allInternalTopics.putAll(internalTopics);
- for (InternalTopicConfig internalTopic : internalTopics.values()) {
- Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic);
-
- if (tasks == null) {
- int numPartitions = -1;
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : topicGroups.entrySet()) {
- Set<String> otherSinkTopics = other.getValue().sinkTopics;
-
- if (otherSinkTopics.contains(internalTopic.name())) {
- for (String topic : other.getValue().sourceTopics) {
- Integer partitions = null;
- // It is possible the sourceTopic is another internal topic, i.e,
- // map().join().join(map())
- if (allInternalTopics.containsKey(topic)) {
- Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(allInternalTopics.get(topic));
- if (taskIds != null) {
- for (TaskId taskId : taskIds) {
- partitions = taskId.partition;
- }
+ if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) {
+ numPartitions = numPartitionsCandidate;
}
- } else {
- partitions = metadata.partitionCountForTopic(topic);
- }
- if (partitions != null && partitions > numPartitions) {
- numPartitions = partitions;
}
}
}
- }
- internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
- for (int partition = 0; partition < numPartitions; partition++) {
- internalPartitionInfos.put(new TopicPartition(internalTopic.name(), partition),
- new PartitionInfo(internalTopic.name(), partition, null, new Node[0], new Node[0]));
+
+ // if we still have not find the right number of partitions,
+ // another iteration is needed
+ if (numPartitions == -1)
+ numPartitionsNeeded = true;
+ else
+ repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
}
}
}
+ } while (numPartitionsNeeded);
+
+ // augment the metadata with the newly computed number of partitions for all the
+ // repartition source topics
+ Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
+ for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
+ String topic = entry.getKey();
+ Integer numPartitions = entry.getValue().numPartitions;
+
+ for (int partition = 0; partition < numPartitions; partition++) {
+ allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
+ new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
+ }
}
+ // ensure the co-partitioning topics within the group have the same number of partitions,
+ // and enforce the number of partitions for those repartition topics to be the same if they
+ // are co-partitioned as well.
+ ensureCopartitioning(streamThread.builder.copartitionGroups(), repartitionTopicMetadata, metadata);
- Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
- ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups,
- metadata.withPartitions(internalPartitionInfos));
-
-
- internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false);
- internalSourceTopicToTaskIds.clear();
+ // make sure the repartition source topics exist with the right number of partitions,
+ // create these topics if necessary
+ prepareTopic(repartitionTopicMetadata);
metadataWithInternalTopics = metadata;
if (internalTopicManager != null)
- metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
+ metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
+
+ log.debug("stream-thread [{}] Created repartition topics {} from the parsed topology.", streamThread.getName(), allRepartitionTopicPartitions.values());
+
+ // ---------------- Step One ---------------- //
// get the tasks as partition groups from the partition grouper
- Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
- sourceTopicGroups, metadataWithInternalTopics);
+ Set<String> allSourceTopics = new HashSet<>();
+ Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ allSourceTopics.addAll(entry.getValue().sourceTopics);
+ sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
+ }
- // add tasks to state change log topic subscribers
- stateChangelogTopicToTaskIds = new HashMap<>();
- for (TaskId task : partitionsForTask.keySet()) {
- final Map<String, InternalTopicConfig> stateChangelogTopics = topicGroups.get(task.topicGroupId).stateChangelogTopics;
- for (InternalTopicConfig topic : stateChangelogTopics.values()) {
- Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topic);
- if (tasks == null) {
- tasks = new HashSet<>();
- stateChangelogTopicToTaskIds.put(topic, tasks);
+ Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
+ sourceTopicsByGroup, metadataWithInternalTopics);
+
+ // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
+ Set<TopicPartition> allAssignedPartitions = new HashSet<>();
+ Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+ Set<TopicPartition> partitions = entry.getValue();
+ for (TopicPartition partition : partitions) {
+ if (allAssignedPartitions.contains(partition)) {
+ log.warn("stream-thread [{}] Partition {} is assigned to more than one tasks: {}", streamThread.getName(), partition, partitionsForTask);
}
+ }
+ allAssignedPartitions.addAll(partitions);
- tasks.add(task);
+ TaskId id = entry.getKey();
+ Set<TaskId> ids = tasksByTopicGroup.get(id.topicGroupId);
+ if (ids == null) {
+ ids = new HashSet<>();
+ tasksByTopicGroup.put(id.topicGroupId, ids);
+ }
+ ids.add(id);
+ }
+ for (String topic : allSourceTopics) {
+ for (PartitionInfo partitionInfo : metadataWithInternalTopics.partitionsForTopic(topic)) {
+ TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+ if (!allAssignedPartitions.contains(partition)) {
+ log.warn("stream-thread [{}] Partition {} is not assigned to any tasks: {}", streamThread.getName(), partition, partitionsForTask);
+ }
}
+ }
- final Map<String, InternalTopicConfig> interSourceTopics = topicGroups.get(task.topicGroupId).interSourceTopics;
- for (InternalTopicConfig topic : interSourceTopics.values()) {
- Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topic);
- if (tasks == null) {
- tasks = new HashSet<>();
- internalSourceTopicToTaskIds.put(topic, tasks);
+ // add tasks to state change log topic subscribers
+ Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ final int topicGroupId = entry.getKey();
+ final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
+
+ for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
+ // the expected number of partitions is the max value of TaskId.partition + 1
+ int numPartitions = -1;
+ for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
+ if (numPartitions < task.partition + 1)
+ numPartitions = task.partition + 1;
}
- tasks.add(task);
+ InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
+ topicMetadata.numPartitions = numPartitions;
+
+ changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
}
}
+ prepareTopic(changelogTopicMetadata);
+
+ log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", streamThread.getName(), changelogTopicMetadata);
+
+ // ---------------- Step Two ---------------- //
+
// assign tasks to clients
- states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas, streamThread.getName());
+ Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+ for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ states.put(entry.getKey(), entry.getValue().state);
+ }
+
+ log.debug("stream-thread [{}] Assigning tasks {} to clients {} with number of replicas {}",
+ streamThread.getName(), partitionsForTask.keySet(), states, numStandbyReplicas);
+
+ TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+
+ log.info("stream-thread [{}] Assigned tasks to clients as {}.", streamThread.getName(), states);
- final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>();
+ // ---------------- Step Three ---------------- //
+
+ // construct the global partition assignment per host map
+ partitionsByHostState = new HashMap<>();
+ for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ HostInfo hostInfo = entry.getValue().hostInfo;
+
+ if (hostInfo != null) {
+ final Set<TopicPartition> topicPartitions = new HashSet<>();
+ final ClientState<TaskId> state = entry.getValue().state;
+
+ for (TaskId id : state.assignedTasks) {
+ topicPartitions.addAll(partitionsForTask.get(id));
+ }
+
+ partitionsByHostState.put(hostInfo, topicPartitions);
+ }
+ }
- final Map<HostInfo, Set<TopicPartition>> endPointMap = new HashMap<>();
- for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
- UUID processId = entry.getKey();
- Set<String> consumers = entry.getValue();
- ClientState<TaskId> state = states.get(processId);
+ // within the client, distribute tasks to its owned consumers
+ Map<String, Assignment> assignment = new HashMap<>();
+ for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ Set<String> consumers = entry.getValue().consumers;
+ ClientState<TaskId> state = entry.getValue().state;
ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
final int numActiveTasks = state.activeTasks.size();
- for (TaskId taskId : state.activeTasks) {
- taskIds.add(taskId);
- }
- for (TaskId id : state.assignedTasks) {
- if (!state.activeTasks.contains(id))
- taskIds.add(id);
- }
- final int numConsumers = consumers.size();
+ taskIds.addAll(state.activeTasks);
+ taskIds.addAll(state.standbyTasks);
+ final int numConsumers = consumers.size();
int i = 0;
for (String consumer : consumers) {
@@ -448,100 +517,58 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
for (AssignedPartition partition : assignedPartitions) {
active.add(partition.taskId);
activePartitions.add(partition.partition);
- HostInfo hostInfo = consumerEndPointMap.get(processId);
- if (hostInfo != null) {
- if (!endPointMap.containsKey(hostInfo)) {
- endPointMap.put(hostInfo, new HashSet<TopicPartition>());
- }
- final Set<TopicPartition> topicPartitions = endPointMap.get(hostInfo);
- topicPartitions.add(partition.partition);
- }
}
-
- assignmentSuppliers.add(new AssignmentSupplier(consumer,
- active,
- standby,
- endPointMap,
- activePartitions));
+ // finally, encode the assignment before sending back to coordinator
+ assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
i++;
}
}
- // if ZK is specified, validate the internal topics again
- prepareTopic(internalSourceTopicToTaskIds, /* compactTopic */ true);
- // change log topics should be compacted
- prepareTopic(stateChangelogTopicToTaskIds, /* compactTopic */ true);
-
- Map<String, Assignment> assignment = new HashMap<>();
- for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) {
- assignment.put(assignmentSupplier.consumer, assignmentSupplier.get());
- }
return assignment;
}
- class AssignmentSupplier {
- private final String consumer;
- private final List<TaskId> active;
- private final Map<TaskId, Set<TopicPartition>> standby;
- private final Map<HostInfo, Set<TopicPartition>> endPointMap;
- private final List<TopicPartition> activePartitions;
-
- AssignmentSupplier(final String consumer,
- final List<TaskId> active,
- final Map<TaskId, Set<TopicPartition>> standby,
- final Map<HostInfo, Set<TopicPartition>> endPointMap,
- final List<TopicPartition> activePartitions) {
- this.consumer = consumer;
- this.active = active;
- this.standby = standby;
- this.endPointMap = endPointMap;
- this.activePartitions = activePartitions;
- }
-
- Assignment get() {
- return new Assignment(activePartitions, new AssignmentInfo(active,
- standby,
- endPointMap).encode());
- }
- }
-
/**
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
@Override
public void onAssignment(Assignment assignment) {
List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
-
Collections.sort(partitions, PARTITION_COMPARATOR);
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
this.standbyTasks = info.standbyTasks;
+ this.activeTasks = new HashMap<>();
+
+ // the number of assigned partitions should be the same as number of active tasks, which
+ // could be duplicated if one task has more than one assigned partitions
+ if (partitions.size() != info.activeTasks.size()) {
+ throw new TaskAssignmentException(
+ String.format("stream-thread [%s] Number of assigned partitions %d is not equal to the number of active taskIds %d" +
+ ", assignmentInfo=%s", streamThread.getName(), partitions.size(), info.activeTasks.size(), info.toString())
+ );
+ }
- Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
- Iterator<TaskId> iter = info.activeTasks.iterator();
- for (TopicPartition partition : partitions) {
- Set<TaskId> taskIds = partitionToTaskIds.get(partition);
- if (taskIds == null) {
- taskIds = new HashSet<>();
- partitionToTaskIds.put(partition, taskIds);
- }
+ for (int i = 0; i < partitions.size(); i++) {
+ TopicPartition partition = partitions.get(i);
+ TaskId id = info.activeTasks.get(i);
- if (iter.hasNext()) {
- taskIds.add(iter.next());
- } else {
- TaskAssignmentException ex = new TaskAssignmentException(
- String.format("stream-thread [%s] failed to find a task id for the partition=%s" +
- ", partitions=%d, assignmentInfo=%s", streamThread.getName(), partition.toString(), partitions.size(), info.toString())
- );
- log.error(ex.getMessage(), ex);
- throw ex;
+ Set<TopicPartition> assignedPartitions = activeTasks.get(id);
+ if (assignedPartitions == null) {
+ assignedPartitions = new HashSet<>();
+ activeTasks.put(id, assignedPartitions);
}
+ assignedPartitions.add(partition);
+ }
+
+ // only need to update the host partitions map if it is not leader
+ if (this.partitionsByHostState == null) {
+ this.partitionsByHostState = info.partitionsByHost;
}
- this.partitionToTaskIds = partitionToTaskIds;
- this.partitionsByHostState = info.partitionsByHostState;
- // only need to build when not coordinator
+
+ // only need to build if it is not leader
if (metadataWithInternalTopics == null) {
final Collection<Set<TopicPartition>> values = partitionsByHostState.values();
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
@@ -558,92 +585,129 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
}
- public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
- if (partitionsByHostState == null) {
- return Collections.emptyMap();
- }
- return Collections.unmodifiableMap(partitionsByHostState);
- }
+ /**
+ * Internal helper function that creates a Kafka topic
+ *
+ * @param topicPartitions Map that contains the topic names to be created with the number of partitions
+ */
+ private void prepareTopic(Map<String, InternalTopicMetadata> topicPartitions) {
+ log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
- public Cluster clusterMetadata() {
- if (metadataWithInternalTopics == null) {
- return Cluster.empty();
- }
- return metadataWithInternalTopics;
- }
+ // if ZK is specified, prepare the internal source topic before calling partition grouper
+ if (internalTopicManager != null) {
+ for (Map.Entry<String, InternalTopicMetadata> entry : topicPartitions.entrySet()) {
+ InternalTopicConfig topic = entry.getValue().config;
+ Integer numPartitions = entry.getValue().numPartitions;
- private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Collection<InternalTopicConfig>> internalTopicGroups, Cluster metadata) {
- Map<String, InternalTopicConfig> internalTopics = new HashMap<>();
- for (Collection<InternalTopicConfig> topics : internalTopicGroups.values()) {
- for (InternalTopicConfig topic : topics) {
- internalTopics.put(topic.name(), topic);
+ if (numPartitions < 0)
+ throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
+
+ internalTopicManager.makeReady(topic, numPartitions);
+
+ // wait until the topic metadata has been propagated to all brokers
+ List<PartitionInfo> partitions;
+ do {
+ partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
+ } while (partitions == null || partitions.size() != numPartitions);
+ }
+ } else {
+ List<String> missingTopics = new ArrayList<>();
+ for (String topic : topicPartitions.keySet()) {
+ List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic);
+ if (partitions == null) {
+ missingTopics.add(topic);
+ }
+ }
+
+ if (!missingTopics.isEmpty()) {
+ log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
+ streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
}
}
+ log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
+ }
+
+ private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
+ Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+ Cluster metadata) {
for (Set<String> copartitionGroup : copartitionGroups) {
- ensureCopartitioning(copartitionGroup, internalTopics, metadata);
+ ensureCopartitioning(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
}
}
- private void ensureCopartitioning(Set<String> copartitionGroup, Map<String, InternalTopicConfig> internalTopics, Cluster metadata) {
+ private void ensureCopartitioning(Set<String> copartitionGroup,
+ Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+ Cluster metadata) {
int numPartitions = -1;
for (String topic : copartitionGroup) {
- if (!internalTopics.containsKey(topic)) {
- List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
+ if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
+ Integer partitions = metadata.partitionCountForTopic(topic);
- if (infos == null)
- throw new TopologyBuilderException(String.format("stream-thread [%s] External source topic not found: %s", streamThread.getName(), topic));
+ if (partitions == null)
+ throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", streamThread.getName(), topic));
if (numPartitions == -1) {
- numPartitions = infos.size();
- } else if (numPartitions != infos.size()) {
+ numPartitions = partitions;
+ } else if (numPartitions != partitions) {
String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
Arrays.sort(topics);
- throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not copartitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
+ throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
}
}
}
+ // if all topics for this co-partition group is repartition topics,
+ // then set the number of partitions to be the maximum of the number of partitions.
if (numPartitions == -1) {
- for (InternalTopicConfig topic : internalTopics.values()) {
- if (copartitionGroup.contains(topic.name())) {
- Integer partitions = metadata.partitionCountForTopic(topic.name());
- if (partitions != null && partitions > numPartitions) {
+ for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
+ if (copartitionGroup.contains(entry.getKey())) {
+ int partitions = entry.getValue().numPartitions;
+ if (partitions > numPartitions) {
numPartitions = partitions;
}
}
}
}
- // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds
- for (InternalTopicConfig topic : internalTopics.values()) {
- if (copartitionGroup.contains(topic.name())) {
- internalSourceTopicToTaskIds
- .put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
+
+ // enforce co-partitioning restrictions to repartition topics by updating their number of partitions
+ for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
+ if (copartitionGroup.contains(entry.getKey())) {
+ entry.getValue().numPartitions = numPartitions;
}
}
}
- /* For Test Only */
- public Set<TaskId> tasksForState(String stateName) {
- final String changeLogName = ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName);
- for (InternalTopicConfig internalTopicConfig : stateChangelogTopicToTaskIds.keySet()) {
- if (internalTopicConfig.name().equals(changeLogName)) {
- return stateChangelogTopicToTaskIds.get(internalTopicConfig);
- }
+ Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
+ if (partitionsByHostState == null) {
+ return Collections.emptyMap();
}
- return Collections.emptySet();
+ return Collections.unmodifiableMap(partitionsByHostState);
+ }
+
+ Cluster clusterMetadata() {
+ if (metadataWithInternalTopics == null) {
+ return Cluster.empty();
+ }
+ return metadataWithInternalTopics;
}
- public Set<TaskId> tasksForPartition(TopicPartition partition) {
- return partitionToTaskIds.get(partition);
+ Map<TaskId, Set<TopicPartition>> activeTasks() {
+ if (activeTasks == null) {
+ return Collections.emptyMap();
+ }
+ return Collections.unmodifiableMap(activeTasks);
}
- public Map<TaskId, Set<TopicPartition>> standbyTasks() {
- return standbyTasks;
+ Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ if (standbyTasks == null) {
+ return Collections.emptyMap();
+ }
+ return Collections.unmodifiableMap(standbyTasks);
}
- public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+ void setInternalTopicManager(InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
@@ -651,11 +715,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
* Used to capture subscribed topic via Patterns discovered during the
* partition assignment process.
*/
- public static class SubscriptionUpdates {
+ public static class SubscriptionUpdates {
private final Set<String> updatedTopicSubscriptions = new HashSet<>();
-
private void updateTopics(Collection<String> topicNames) {
updatedTopicSubscriptions.clear();
updatedTopicSubscriptions.addAll(topicNames);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d7bb98c..7a04339 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -111,7 +111,6 @@ public class StreamThread extends Thread {
private boolean processStandbyRecords = false;
private AtomicBoolean initialized = new AtomicBoolean(false);
- private final long cacheSizeBytes;
private ThreadCache cache;
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@@ -184,7 +183,7 @@ public class StreamThread extends Thread {
if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadName);
}
- this.cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
+ long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.sensors);
@@ -637,34 +636,24 @@ public class StreamThread extends Thread {
if (partitionAssignor == null)
throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
- HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
-
- for (TopicPartition partition : assignment) {
- Set<TaskId> taskIds = partitionAssignor.tasksForPartition(partition);
- for (TaskId taskId : taskIds) {
- Set<TopicPartition> partitions = partitionsForTask.get(taskId);
- if (partitions == null) {
- partitions = new HashSet<>();
- partitionsForTask.put(taskId, partitions);
- }
- partitions.add(partition);
- }
- }
-
// create the active tasks
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.activeTasks().entrySet()) {
TaskId taskId = entry.getKey();
Set<TopicPartition> partitions = entry.getValue();
- try {
- StreamTask task = createStreamTask(taskId, partitions);
- activeTasks.put(taskId, task);
+ if (assignment.containsAll(partitions)) {
+ try {
+ StreamTask task = createStreamTask(taskId, partitions);
+ activeTasks.put(taskId, task);
- for (TopicPartition partition : partitions)
- activeTasksByPartition.put(partition, task);
- } catch (StreamsException e) {
- log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
- throw e;
+ for (TopicPartition partition : partitions)
+ activeTasksByPartition.put(partition, task);
+ } catch (StreamsException e) {
+ log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
+ throw e;
+ }
+ } else {
+ log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", logPrefix, taskId, partitions, assignment);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 6569f85..ddbd67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -41,7 +41,7 @@ public class AssignmentInfo {
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
/**
- * A new field was added, partitionsByHostState. CURRENT_VERSION
+ * A new field was added, partitionsByHost. CURRENT_VERSION
* is required so we can decode the previous version. For example, this may occur
* during a rolling upgrade
*/
@@ -49,7 +49,7 @@ public class AssignmentInfo {
public final int version;
public final List<TaskId> activeTasks; // each element corresponds to a partition
public final Map<TaskId, Set<TopicPartition>> standbyTasks;
- public final Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+ public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
@@ -61,7 +61,7 @@ public class AssignmentInfo {
this.version = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
- this.partitionsByHostState = hostState;
+ this.partitionsByHost = hostState;
}
/**
@@ -89,8 +89,8 @@ public class AssignmentInfo {
Set<TopicPartition> partitions = entry.getValue();
writeTopicPartitions(out, partitions);
}
- out.writeInt(partitionsByHostState.size());
- for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHostState
+ out.writeInt(partitionsByHost.size());
+ for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost
.entrySet()) {
final HostInfo hostInfo = entry.getKey();
out.writeUTF(hostInfo.host());
@@ -174,7 +174,7 @@ public class AssignmentInfo {
@Override
public int hashCode() {
- return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHostState.hashCode();
+ return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
}
@Override
@@ -184,7 +184,7 @@ public class AssignmentInfo {
return this.version == other.version &&
this.activeTasks.equals(other.activeTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&
- this.partitionsByHostState.equals(other.partitionsByHostState);
+ this.partitionsByHost.equals(other.partitionsByHost);
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index b59af86..0746cab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -22,11 +22,12 @@ import java.util.Set;
public class ClientState<T> {
- public final static double COST_ACTIVE = 0.1;
- public final static double COST_STANDBY = 0.2;
- public final static double COST_LOAD = 0.5;
+ final static double COST_ACTIVE = 0.1;
+ final static double COST_STANDBY = 0.2;
+ final static double COST_LOAD = 0.5;
public final Set<T> activeTasks;
+ public final Set<T> standbyTasks;
public final Set<T> assignedTasks;
public final Set<T> prevActiveTasks;
public final Set<T> prevAssignedTasks;
@@ -39,11 +40,12 @@ public class ClientState<T> {
}
public ClientState(double capacity) {
- this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
+ this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
}
- private ClientState(Set<T> activeTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
+ private ClientState(Set<T> activeTasks, Set<T> standbyTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
this.activeTasks = activeTasks;
+ this.standbyTasks = standbyTasks;
this.assignedTasks = assignedTasks;
this.prevActiveTasks = prevActiveTasks;
this.prevAssignedTasks = prevAssignedTasks;
@@ -52,13 +54,15 @@ public class ClientState<T> {
}
public ClientState<T> copy() {
- return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(assignedTasks),
+ return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(standbyTasks), new HashSet<>(assignedTasks),
new HashSet<>(prevActiveTasks), new HashSet<>(prevAssignedTasks), capacity);
}
public void assign(T taskId, boolean active) {
if (active)
activeTasks.add(taskId);
+ else
+ standbyTasks.add(taskId);
assignedTasks.add(taskId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index fadb43f..e807c4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
@@ -33,24 +32,20 @@ public class TaskAssignor<C, T extends Comparable<T>> {
private static final Logger log = LoggerFactory.getLogger(TaskAssignor.class);
- public static <C, T extends Comparable<T>> Map<C, ClientState<T>> assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas, String streamThreadId) {
+ public static <C, T extends Comparable<T>> void assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas) {
long seed = 0L;
for (C client : states.keySet()) {
seed += client.hashCode();
}
TaskAssignor<C, T> assignor = new TaskAssignor<>(states, tasks, seed);
- log.info("stream-thread [{}] Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " +
- "prevClientsUnchanged: {}, tasks: {}, replicas: {}",
- streamThreadId, states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged,
- tasks, numStandbyReplicas);
+ // assign active tasks
assignor.assignTasks();
+
+ // assign standby tasks
if (numStandbyReplicas > 0)
assignor.assignStandbyTasks(numStandbyReplicas);
-
- log.info("stream-thread [{}] Assigned with: {}", streamThreadId, assignor.states);
- return assignor.states;
}
private final Random rand;
@@ -63,36 +58,38 @@ public class TaskAssignor<C, T extends Comparable<T>> {
private TaskAssignor(Map<C, ClientState<T>> states, Set<T> tasks, long randomSeed) {
this.rand = new Random(randomSeed);
- this.states = new HashMap<>();
+ this.tasks = new ArrayList<>(tasks);
+ this.states = states;
+
int avgNumTasks = tasks.size() / states.size();
Set<T> existingTasks = new HashSet<>();
for (Map.Entry<C, ClientState<T>> entry : states.entrySet()) {
- this.states.put(entry.getKey(), entry.getValue().copy());
Set<T> oldTasks = entry.getValue().prevAssignedTasks;
+
// make sure the previous assignment is balanced
prevAssignmentBalanced = prevAssignmentBalanced &&
oldTasks.size() < 2 * avgNumTasks && oldTasks.size() > avgNumTasks / 2;
+
+ // make sure there are no duplicates
for (T task : oldTasks) {
- // Make sure there is no duplicates
prevClientsUnchanged = prevClientsUnchanged && !existingTasks.contains(task);
}
existingTasks.addAll(oldTasks);
}
- // Make sure the existing assignment didn't miss out any task
- prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
- this.tasks = new ArrayList<>(tasks);
+ // make sure the existing assignment didn't miss out any task
+ prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
int numTasks = tasks.size();
this.maxNumTaskPairs = numTasks * (numTasks - 1) / 2;
this.taskPairs = new HashSet<>(this.maxNumTaskPairs);
}
- public void assignTasks() {
+ private void assignTasks() {
assignTasks(true);
}
- public void assignStandbyTasks(int numStandbyReplicas) {
+ private void assignStandbyTasks(int numStandbyReplicas) {
int numReplicas = Math.min(numStandbyReplicas, states.size() - 1);
for (int i = 0; i < numReplicas; i++) {
assignTasks(false);
@@ -195,10 +192,10 @@ public class TaskAssignor<C, T extends Comparable<T>> {
}
private static class TaskPair<T> {
- public final T task1;
- public final T task2;
+ final T task1;
+ final T task2;
- public TaskPair(T task1, T task2) {
+ TaskPair(T task1, T task2) {
this.task1 = task1;
this.task2 = task2;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 37a15e1..ce75d91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -74,7 +74,7 @@ public class HostInfo {
@Override
public String toString() {
return "HostInfo{" +
- "host='" + host + '\'' +
+ "host=\'" + host + '\'' +
", port=" + port +
'}';
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 41f9ae2..83e3d10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -120,14 +120,4 @@ public class StoreChangeLogger<K, V> {
this.removed.clear();
this.dirty.clear();
}
-
- // this is for test only
- public int numDirty() {
- return this.dirty.size();
- }
-
- // this is for test only
- public int numRemoved() {
- return this.removed.size();
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index a5fb076..03df4cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -85,7 +85,6 @@ public class FanoutIntegrationTest {
CLUSTER.createTopic(OUTPUT_TOPIC_C);
}
-
@Parameter
public long cacheSizeBytes;
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index e5e334c..8c96ecb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -206,7 +206,7 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldCantHaveNullPredicate() throws Exception {
- testStream.branch(null);
+ testStream.branch((Predicate) null);
}
@Test(expected = NullPointerException.class)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index afa1033..49dcbd0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -40,6 +40,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.lang.reflect.Field;
import java.io.File;
import java.io.IOException;
@@ -337,7 +338,7 @@ public class KTableImplTest {
}
@Test
- public void testRepartition() throws IOException {
+ public void testRepartition() throws Exception {
String topic1 = "topic1";
String storeName1 = "storeName1";
@@ -367,10 +368,15 @@ public class KTableImplTest {
assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
- assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
- assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
- assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
- assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
+ Field valSerializerField = ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
+ Field valDeserializerField = ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+ valSerializerField.setAccessible(true);
+ valDeserializerField.setAccessible(true);
+
+ assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000003"))).inner());
+ assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
+ assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000007"))).inner());
+ assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
}
@Test(expected = NullPointerException.class)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index d260937..c402c9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -156,7 +156,7 @@ public class TopologyBuilderTest {
builder.addSource("source-3", "topic-3");
builder.addInternalTopic("topic-3");
- Set<String> expected = new HashSet<String>();
+ Set<String> expected = new HashSet<>();
expected.add("topic-1");
expected.add("topic-2");
expected.add("X-topic-3");
@@ -516,7 +516,7 @@ public class TopologyBuilderTest {
builder.addInternalTopic("foo");
builder.addSource("source", "foo");
final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
- final InternalTopicConfig topicConfig = topicsInfo.interSourceTopics.get("appId-foo");
+ final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Properties properties = topicConfig.toProperties(0);
assertEquals("appId-foo", topicConfig.name());
assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
|