kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3311; Prepare internal source topics before calling partition grouper
Date Wed, 02 Mar 2016 21:43:53 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 002b377da -> 2a58ba9fd


KAFKA-3311; Prepare internal source topics before calling partition grouper

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Matsuda <yasuhiro.matsuda@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #990 from guozhangwang/K3311


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a58ba9f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a58ba9f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a58ba9f

Branch: refs/heads/trunk
Commit: 2a58ba9fd893979f89aec251579b10f5cda41d10
Parents: 002b377
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Mar 2 13:43:48 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 2 13:43:48 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/common/Cluster.java   |  36 ++++++
 .../streams/processor/TopologyBuilder.java      |  29 +++--
 .../internals/StreamPartitionAssignor.java      | 113 ++++++++++++++++++-
 .../processor/internals/StreamThread.java       |  30 -----
 .../streams/processor/TopologyBuilderTest.java  |  12 +-
 5 files changed, 169 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 8883e45..d86e3a4 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -118,6 +118,42 @@ public final class Cluster {
     }
 
     /**
+     * Update the cluster information for specific topic with new partition information
+     */
+    public Cluster update(String topic, Collection<PartitionInfo> partitions) {
+
+        // re-index the partitions by topic/partition for quick lookup
+        for (PartitionInfo p : partitions)
+            this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()),
p);
+
+        // re-index the partitions by topic and node respectively
+        this.partitionsByTopic.put(topic, Collections.unmodifiableList(new ArrayList<>(partitions)));
+
+        List<PartitionInfo> availablePartitions = new ArrayList<>();
+        for (PartitionInfo part : partitions) {
+            if (part.leader() != null)
+                availablePartitions.add(part);
+        }
+        this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
+
+        HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>();
+        for (Node n : this.nodes) {
+            partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
+        }
+        for (PartitionInfo p : partitions) {
+            if (p.leader() != null) {
+                List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));
+                psNode.add(p);
+            }
+        }
+
+        for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
+            this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+
+        return this;
+    }
+
+    /**
      * @return The known set of nodes
      */
     public List<Node> nodes() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index be5c728..3ef1b0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -58,12 +58,11 @@ public class TopologyBuilder {
     private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
 
     private final Set<String> sourceTopicNames = new HashSet<>();
-
     private final Set<String> internalTopicNames = new HashSet<>();
-
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
     private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
-    private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
+    private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>();
+    private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
     private Map<Integer, Set<String>> nodeGroups = null;
 
     private static class StateStoreFactory {
@@ -154,11 +153,13 @@ public class TopologyBuilder {
     }
 
     public static class TopicsInfo {
+        public Set<String> sinkTopics;
         public Set<String> sourceTopics;
         public Set<String> interSourceTopics;
         public Set<String> stateChangelogTopics;
 
-        public TopicsInfo(Set<String> sourceTopics, Set<String> interSourceTopics,
Set<String> stateChangelogTopics) {
+        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Set<String>
interSourceTopics, Set<String> stateChangelogTopics) {
+            this.sinkTopics = sinkTopics;
             this.sourceTopics = sourceTopics;
             this.interSourceTopics = interSourceTopics;
             this.stateChangelogTopics = stateChangelogTopics;
@@ -228,7 +229,7 @@ public class TopologyBuilder {
         }
 
         nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
-        nodeToTopics.put(name, topics.clone());
+        nodeToSourceTopics.put(name, topics.clone());
         nodeGrouper.add(name);
 
         return this;
@@ -345,6 +346,7 @@ public class TopologyBuilder {
         }
 
         nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer,
valSerializer, partitioner));
+        nodeToSinkTopic.put(name, topic);
         nodeGrouper.add(name);
         nodeGrouper.unite(name, parentNames);
         return this;
@@ -502,12 +504,13 @@ public class TopologyBuilder {
             nodeGroups = makeNodeGroups();
 
         for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
+            Set<String> sinkTopics = new HashSet<>();
             Set<String> sourceTopics = new HashSet<>();
             Set<String> internalSourceTopics = new HashSet<>();
             Set<String> stateChangelogTopics = new HashSet<>();
             for (String node : entry.getValue()) {
                 // if the node is a source node, add to the source topics
-                String[] topics = nodeToTopics.get(node);
+                String[] topics = nodeToSourceTopics.get(node);
                 if (topics != null) {
                     sourceTopics.addAll(Arrays.asList(topics));
 
@@ -518,6 +521,11 @@ public class TopologyBuilder {
                     }
                 }
 
+                // if the node is a sink node, add to the sink topics
+                String topic = nodeToSinkTopic.get(node);
+                if (topic != null)
+                    sinkTopics.add(topic);
+
                 // if the node is connected to a state, add to the state topics
                 for (StateStoreFactory stateFactory : stateFactories.values()) {
 
@@ -529,6 +537,7 @@ public class TopologyBuilder {
                 }
             }
             topicGroups.put(entry.getKey(), new TopicsInfo(
+                    Collections.unmodifiableSet(sinkTopics),
                     Collections.unmodifiableSet(sourceTopics),
                     Collections.unmodifiableSet(internalSourceTopics),
                     Collections.unmodifiableSet(stateChangelogTopics)));
@@ -556,7 +565,7 @@ public class TopologyBuilder {
         int nodeGroupId = 0;
 
         // Go through source nodes first. This makes the group id assignment easy to predict
in tests
-        for (String nodeName : Utils.sorted(nodeToTopics.keySet())) {
+        for (String nodeName : Utils.sorted(nodeToSourceTopics.keySet())) {
             String root = nodeGrouper.root(nodeName);
             Set<String> nodeGroup = rootToNodeGroup.get(root);
             if (nodeGroup == null) {
@@ -569,7 +578,7 @@ public class TopologyBuilder {
 
         // Go through non-source nodes
         for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
-            if (!nodeToTopics.containsKey(nodeName)) {
+            if (!nodeToSourceTopics.containsKey(nodeName)) {
                 String root = nodeGrouper.root(nodeName);
                 Set<String> nodeGroup = rootToNodeGroup.get(root);
                 if (nodeGroup == null) {
@@ -597,7 +606,7 @@ public class TopologyBuilder {
 
     /**
      * Returns the copartition groups.
-     * A copartition group is a group of topics that are required to be copartitioned.
+     * A copartition group is a group of source topics that are required to be copartitioned.
      *
      * @return groups of topic names
      */
@@ -606,7 +615,7 @@ public class TopologyBuilder {
         for (Set<String> nodeNames : copartitionSourceGroups) {
             Set<String> copartitionGroup = new HashSet<>();
             for (String node : nodeNames) {
-                String[] topics = nodeToTopics.get(node);
+                String[] topics = nodeToSourceTopics.get(node);
                 if (topics != null)
                     copartitionGroup.addAll(Arrays.asList(topics));
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 1cc5287..55cbb0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -23,18 +23,22 @@ import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.streams.StreamsConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -156,7 +160,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Map<UUID, Set<String>> consumersByClient = new HashMap<>();
         Map<UUID, ClientState<TaskId>> states = new HashMap<>();
 
-        // Decode subscription info
+        // decode subscription info
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
@@ -182,11 +186,73 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             state.capacity = state.capacity + 1d;
         }
 
-        // get the tasks as partition groups from the partition grouper
+        // ensure the co-partitioning topics within the group have the same number of partitions,
+        // and enforce the number of partitions for those internal topics.
         Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
+        Map<Integer, Set<String>> internalSourceTopicGroups = new HashMap<>();
         for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet())
{
             sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
+            internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics);
         }
+        Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
+
+        ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups, metadata);
+
+        // for those internal source topics that do not have co-partition enforcement,
+        // set the number of partitions to the maximum of the depending sub-topologies source
topics
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet())
{
+            Set<String> internalTopics = entry.getValue().interSourceTopics;
+            for (String internalTopic : internalTopics) {
+                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic);
+
+                if (tasks == null) {
+                    int numPartitions = -1;
+                    for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : topicGroups.entrySet())
{
+                        Set<String> otherSinkTopics = other.getValue().sinkTopics;
+
+                        if (otherSinkTopics.contains(internalTopic)) {
+                            for (String topic : other.getValue().sourceTopics) {
+                                List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
+
+                                if (infos != null && infos.size() > numPartitions)
+                                    numPartitions = infos.size();
+                            }
+                        }
+                    }
+
+                    internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new
TaskId(entry.getKey(), numPartitions)));
+                }
+            }
+        }
+
+        // if ZK is specified, prepare the internal source topic before calling partition
grouper
+        if (internalTopicManager != null) {
+            log.debug("Starting to validate internal source topics in partition assignor.");
+
+            for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet())
{
+                String topic = streamThread.jobId + "-" + entry.getKey();
+
+                // should have size 1 only
+                int numPartitions = -1;
+                for (TaskId task : entry.getValue()) {
+                    numPartitions = task.partition;
+                }
+
+                internalTopicManager.makeReady(topic, numPartitions);
+
+                // wait until the topic metadata has been propagated to all brokers
+                List<PartitionInfo> partitions;
+                do {
+                    partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                } while (partitions == null || partitions.size() != numPartitions);
+
+                metadata.update(topic, partitions);
+            }
+
+            log.info("Completed validating internal source topics in partition assignor.");
+        }
+
+        // get the tasks as partition groups from the partition grouper
         Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups,
metadata);
 
         // add tasks to state topic subscribers
@@ -274,7 +340,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
 
-        // if ZK is specified, get the tasks / internal topics for each state topic and validate
the topic partitions
+        // if ZK is specified, validate the internal source topics and the state changelog
topics
         if (internalTopicManager != null) {
             log.debug("Starting to validate changelog topics in partition assignor.");
 
@@ -339,6 +405,43 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         this.partitionToTaskIds = partitionToTaskIds;
     }
 
+    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
Map<Integer, Set<String>> internalTopicGroups, Cluster metadata) {
+        Set<String> internalTopics = new HashSet<>();
+        for (Set<String> topics : internalTopicGroups.values())
+            internalTopics.addAll(topics);
+
+        for (Set<String> copartitionGroup : copartitionGroups) {
+            ensureCopartitioning(copartitionGroup, internalTopics, metadata);
+        }
+    }
+
+    private void ensureCopartitioning(Set<String> copartitionGroup, Set<String>
internalTopics, Cluster metadata) {
+        int numPartitions = -1;
+
+        for (String topic : copartitionGroup) {
+            if (!internalTopics.contains(topic)) {
+                List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
+
+                if (infos == null)
+                    throw new TopologyBuilderException("External source topic not found:
" + topic);
+
+                if (numPartitions == -1) {
+                    numPartitions = infos.size();
+                } else if (numPartitions != infos.size()) {
+                    String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
+                    Arrays.sort(topics);
+                    throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics),
",") + "]");
+                }
+            }
+        }
+
+        // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds
+        for (String topic : internalTopics) {
+            if (copartitionGroup.contains(topic))
+                internalSourceTopicToTaskIds.put(topic, Collections.singleton(new TaskId(-1,
numPartitions)));
+        }
+    }
+
     /* For Test Only */
     public Set<TaskId> tasksForState(String stateName) {
         return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b8ff135..4ce86ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.Metrics;
@@ -44,7 +43,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -56,7 +54,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.FileLock;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -316,8 +313,6 @@ public class StreamThread extends Thread {
         long lastPoll = 0L;
         boolean requiresPoll = true;
 
-        ensureCopartitioning(builder.copartitionGroups());
-
         consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
 
         while (stillRunning()) {
@@ -720,31 +715,6 @@ public class StreamThread extends Thread {
         }
     }
 
-    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups)
{
-        for (Set<String> copartitionGroup : copartitionGroups) {
-            ensureCopartitioning(copartitionGroup);
-        }
-    }
-
-    private void ensureCopartitioning(Set<String> copartitionGroup) {
-        int numPartitions = -1;
-
-        for (String topic : copartitionGroup) {
-            List<PartitionInfo> infos = consumer.partitionsFor(topic);
-
-            if (infos == null)
-                throw new TopologyBuilderException("Topic not found: " + topic);
-
-            if (numPartitions == -1) {
-                numPartitions = infos.size();
-            } else if (numPartitions != infos.size()) {
-                String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
-                Arrays.sort(topics);
-                throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics),
",") + "]");
-            }
-        }
-    }
-
     private class StreamsMetricsImpl implements StreamsMetrics {
         final Metrics metrics;
         final String metricGrpName;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index a93b8ab..0635bd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -215,9 +215,9 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"),
Collections.<String>emptySet(), Collections.<String>emptySet()));
-        expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(),
Collections.<String>emptySet()));
-        expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(),
Collections.<String>emptySet()));
+        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1",
"topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3",
"topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
Collections.<String>emptySet(), Collections.<String>emptySet()));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -253,9 +253,9 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"),
Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
-        expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(),
mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
-        expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(),
mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
+        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1",
"topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
+        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3",
"topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
+        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);


Mime
View raw message