Repository: kafka
Updated Branches:
refs/heads/trunk 0dd9607f9 -> a4ab9d02a
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e46a016..cd9b7a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -90,13 +90,13 @@ public class StreamPartitionAssignorTest {
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
private final TaskId task3 = new TaskId(0, 3);
- private String userEndPoint = null;
+ private String userEndPoint = "localhost:2171";
private Properties configProps() {
return new Properties() {
{
setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
- setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+ setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
}
@@ -297,12 +297,12 @@ public class StreamPartitionAssignorTest {
TaskId task10 = new TaskId(1, 0);
TaskId task11 = new TaskId(1, 1);
TaskId task12 = new TaskId(1, 2);
+ List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
-
StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
@@ -323,14 +323,43 @@ public class StreamPartitionAssignorTest {
assertEquals(2, assignments.get("consumer11").partitions().size());
assertEquals(2, assignments.get("consumer20").partitions().size());
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
+ AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+ AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+ assertEquals(2, info10.activeTasks.size());
+ assertEquals(2, info11.activeTasks.size());
+ assertEquals(2, info20.activeTasks.size());
+
+ Set<TaskId> allTasks = new HashSet<>();
+ allTasks.addAll(info10.activeTasks);
+ allTasks.addAll(info11.activeTasks);
+ allTasks.addAll(info20.activeTasks);
+ assertEquals(new HashSet<>(tasks), allTasks);
// check tasks for state topics
- assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
- assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
- assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
+ Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
+
+ assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1",
tasks, topicGroups));
+ assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2",
tasks, topicGroups));
+ assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3",
tasks, topicGroups));
+ }
+
+ private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId>
tasks, Map<Integer, TopologyBuilder.TopicsInfo> topicGroups) {
+ final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId,
storeName);
+
+ Set<TaskId> ids = new HashSet<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet())
{
+ Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
+
+ if (stateChangelogTopics.contains(changelogTopic)) {
+ for (TaskId id : tasks) {
+ if (id.topicGroupId == entry.getKey())
+ ids.add(id);
+ }
+ }
+ }
+ return ids;
}
@Test
@@ -406,48 +435,6 @@ public class StreamPartitionAssignorTest {
assertEquals(allTasks, allStandbyTasks);
}
- private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
-
- // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic
group.
-
- AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-
- // check if the number of assigned partitions == the size of active task id list
- assertEquals(assignment.partitions().size(), info.activeTasks.size());
-
- // check if active tasks are consistent
- List<TaskId> activeTasks = new ArrayList<>();
- Set<String> activeTopics = new HashSet<>();
- for (TopicPartition partition : assignment.partitions()) {
- // since default grouper, taskid.partition == partition.partition()
- activeTasks.add(new TaskId(0, partition.partition()));
- activeTopics.add(partition.topic());
- }
- assertEquals(activeTasks, info.activeTasks);
-
- // check if active partitions cover all topics
- assertEquals(allTopics, activeTopics);
-
- // check if standby tasks are consistent
- Set<String> standbyTopics = new HashSet<>();
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet())
{
- TaskId id = entry.getKey();
- Set<TopicPartition> partitions = entry.getValue();
- for (TopicPartition partition : partitions) {
- // since default grouper, taskid.partition == partition.partition()
- assertEquals(id.partition, partition.partition());
-
- standbyTopics.add(partition.topic());
- }
- }
-
- if (info.standbyTasks.size() > 0)
- // check if standby partitions cover all topics
- assertEquals(allTopics, standbyTopics);
-
- return info;
- }
-
@Test
public void testOnAssignment() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
@@ -468,16 +455,18 @@ public class StreamPartitionAssignorTest {
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
List<TaskId> activeTaskList = Utils.mkList(task0, task3);
+ Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
- standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
- standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+ activeTasks.put(task0, Utils.mkSet(t1p0));
+ activeTasks.put(task3, Utils.mkSet(t2p3));
+ standbyTasks.put(task1, Utils.mkSet(t1p0));
+ standbyTasks.put(task2, Utils.mkSet(t2p0));
AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo,
Set<TopicPartition>>());
PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0,
t2p3), info.encode());
partitionAssignor.onAssignment(assignment);
- assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
- assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
+ assertEquals(activeTasks, partitionAssignor.activeTasks());
assertEquals(standbyTasks, partitionAssignor.standbyTasks());
}
@@ -621,7 +610,7 @@ public class StreamPartitionAssignorTest {
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata,
subscriptions);
final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
- final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHostState.get(new
HostInfo("localhost", 8080));
+ final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new
HostInfo("localhost", 8080));
assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
new TopicPartition("topic1", 2)), topicPartitions);
@@ -725,13 +714,54 @@ public class StreamPartitionAssignorTest {
}
+ private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+ // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic
group.
+
+ AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+ // check if the number of assigned partitions == the size of active task id list
+ assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+ // check if active tasks are consistent
+ List<TaskId> activeTasks = new ArrayList<>();
+ Set<String> activeTopics = new HashSet<>();
+ for (TopicPartition partition : assignment.partitions()) {
+ // since default grouper, taskid.partition == partition.partition()
+ activeTasks.add(new TaskId(0, partition.partition()));
+ activeTopics.add(partition.topic());
+ }
+ assertEquals(activeTasks, info.activeTasks);
+
+ // check if active partitions cover all topics
+ assertEquals(allTopics, activeTopics);
+
+ // check if standby tasks are consistent
+ Set<String> standbyTopics = new HashSet<>();
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet())
{
+ TaskId id = entry.getKey();
+ Set<TopicPartition> partitions = entry.getValue();
+ for (TopicPartition partition : partitions) {
+ // since default grouper, taskid.partition == partition.partition()
+ assertEquals(id.partition, partition.partition());
+
+ standbyTopics.add(partition.topic());
+ }
+ }
+
+ if (info.standbyTasks.size() > 0)
+ // check if standby partitions cover all topics
+ assertEquals(allTopics, standbyTopics);
+
+ return info;
+ }
private class MockInternalTopicManager extends InternalTopicManager {
- public Map<String, Integer> readyTopics = new HashMap<>();
- public MockConsumer<byte[], byte[]> restoreConsumer;
+ Map<String, Integer> readyTopics = new HashMap<>();
+ MockConsumer<byte[], byte[]> restoreConsumer;
- public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer)
{
+ MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
super();
this.restoreConsumer = restoreConsumer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 411e02d..f4772f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -109,7 +109,7 @@ public class StreamsMetadataStateTest {
new PartitionInfo("topic-three", 0, null, null, null),
new PartitionInfo("topic-four", 0, null, null, null));
- cluster = new Cluster(Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet());
+ cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos,
Collections.<String>emptySet(), Collections.<String>emptySet());
discovery = new StreamsMetadataState(builder);
discovery.onChange(hostToPartitions, cluster);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ce94a23..cfa0e61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -64,7 +64,7 @@ public class AssignmentInfoTest {
final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
assertEquals(oldVersion.activeTasks, decoded.activeTasks);
assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
- assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't
in V1
+ assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in
V1
assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
index 4333087..52ca0a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
@@ -33,23 +33,34 @@ import static org.junit.Assert.assertTrue;
public class TaskAssignorTest {
+ private static Map<Integer, ClientState<Integer>> copyStates(Map<Integer,
ClientState<Integer>> states) {
+ Map<Integer, ClientState<Integer>> copy = new HashMap<>();
+ for (Map.Entry<Integer, ClientState<Integer>> entry : states.entrySet())
{
+ copy.put(entry.getKey(), entry.getValue().copy());
+ }
+
+ return copy;
+ }
+
@Test
public void testAssignWithoutStandby() {
- HashMap<Integer, ClientState<Integer>> states = new HashMap<>();
+ HashMap<Integer, ClientState<Integer>> statesWithNoPrevTasks = new HashMap<>();
for (int i = 0; i < 6; i++) {
- states.put(i, new ClientState<Integer>(1d));
+ statesWithNoPrevTasks.put(i, new ClientState<Integer>(1d));
}
Set<Integer> tasks;
- Map<Integer, ClientState<Integer>> assignments;
int numActiveTasks;
int numAssignedTasks;
+ Map<Integer, ClientState<Integer>> states;
+
// # of clients and # of tasks are equal.
+ states = copyStates(statesWithNoPrevTasks);
tasks = mkSet(0, 1, 2, 3, 4, 5);
- assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+ TaskAssignor.assign(states, tasks, 0);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertEquals(1, assignment.activeTasks.size());
@@ -60,10 +71,11 @@ public class TaskAssignorTest {
// # of clients < # of tasks
tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
- assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 0);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(1 <= assignment.activeTasks.size());
@@ -76,10 +88,11 @@ public class TaskAssignorTest {
// # of clients > # of tasks
tasks = mkSet(0, 1, 2, 3);
- assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 0);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -93,12 +106,12 @@ public class TaskAssignorTest {
@Test
public void testAssignWithStandby() {
- HashMap<Integer, ClientState<Integer>> states = new HashMap<>();
+ HashMap<Integer, ClientState<Integer>> statesWithNoPrevTasks = new HashMap<>();
for (int i = 0; i < 6; i++) {
- states.put(i, new ClientState<Integer>(1d));
+ statesWithNoPrevTasks.put(i, new ClientState<Integer>(1d));
}
Set<Integer> tasks;
- Map<Integer, ClientState<Integer>> assignments;
+ Map<Integer, ClientState<Integer>> states;
int numActiveTasks;
int numAssignedTasks;
@@ -108,8 +121,9 @@ public class TaskAssignorTest {
// 1 standby replicas.
numActiveTasks = 0;
numAssignedTasks = 0;
- assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
- for (ClientState<Integer> assignment : assignments.values()) {
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 1);
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertEquals(1, assignment.activeTasks.size());
@@ -122,10 +136,11 @@ public class TaskAssignorTest {
tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
// 1 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 1);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(1 <= assignment.activeTasks.size());
@@ -140,10 +155,11 @@ public class TaskAssignorTest {
tasks = mkSet(0, 1, 2, 3);
// 1 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 1);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -158,10 +174,11 @@ public class TaskAssignorTest {
tasks = mkSet(0, 1);
// 1 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 1);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -173,10 +190,11 @@ public class TaskAssignorTest {
assertEquals(tasks.size() * 2, numAssignedTasks);
// 2 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 2, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 2);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -187,10 +205,11 @@ public class TaskAssignorTest {
assertEquals(tasks.size() * 3, numAssignedTasks);
// 3 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 3, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 3);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -205,27 +224,29 @@ public class TaskAssignorTest {
@Test
public void testStickiness() {
List<Integer> tasks;
- Map<Integer, ClientState<Integer>> states;
+ Map<Integer, ClientState<Integer>> statesWithPrevTasks;
Map<Integer, ClientState<Integer>> assignments;
int i;
// # of clients and # of tasks are equal.
+ Map<Integer, ClientState<Integer>> states;
tasks = mkList(0, 1, 2, 3, 4, 5);
Collections.shuffle(tasks);
- states = new HashMap<>();
+ statesWithPrevTasks = new HashMap<>();
i = 0;
for (int task : tasks) {
ClientState<Integer> state = new ClientState<>(1d);
state.prevActiveTasks.add(task);
state.prevAssignedTasks.add(task);
- states.put(i++, state);
+ statesWithPrevTasks.put(i++, state);
}
- assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0, "TaskAssignorTest-TestStickiness");
+ states = copyStates(statesWithPrevTasks);
+ TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0);
for (int client : states.keySet()) {
- Set<Integer> oldActive = states.get(client).prevActiveTasks;
- Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
- Set<Integer> newActive = assignments.get(client).activeTasks;
- Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+ Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+ Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+ Set<Integer> newActive = states.get(client).activeTasks;
+ Set<Integer> newAssigned = states.get(client).assignedTasks;
assertEquals(oldActive, newActive);
assertEquals(oldAssigned, newAssigned);
@@ -234,7 +255,7 @@ public class TaskAssignorTest {
// # of clients > # of tasks
tasks = mkList(0, 1, 2, 3, -1, -1);
Collections.shuffle(tasks);
- states = new HashMap<>();
+ statesWithPrevTasks = new HashMap<>();
i = 0;
for (int task : tasks) {
ClientState<Integer> state = new ClientState<>(1d);
@@ -242,14 +263,15 @@ public class TaskAssignorTest {
state.prevActiveTasks.add(task);
state.prevAssignedTasks.add(task);
}
- states.put(i++, state);
+ statesWithPrevTasks.put(i++, state);
}
- assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0, "TaskAssignorTest-TestStickiness");
+ states = copyStates(statesWithPrevTasks);
+ TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0);
for (int client : states.keySet()) {
- Set<Integer> oldActive = states.get(client).prevActiveTasks;
- Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
- Set<Integer> newActive = assignments.get(client).activeTasks;
- Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+ Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+ Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+ Set<Integer> newActive = states.get(client).activeTasks;
+ Set<Integer> newAssigned = states.get(client).assignedTasks;
assertEquals(oldActive, newActive);
assertEquals(oldAssigned, newAssigned);
@@ -258,20 +280,21 @@ public class TaskAssignorTest {
// # of clients < # of tasks
List<Set<Integer>> taskSets = mkList(mkSet(0, 1), mkSet(2, 3), mkSet(4,
5), mkSet(6, 7), mkSet(8, 9), mkSet(10, 11));
Collections.shuffle(taskSets);
- states = new HashMap<>();
+ statesWithPrevTasks = new HashMap<>();
i = 0;
for (Set<Integer> taskSet : taskSets) {
ClientState<Integer> state = new ClientState<>(1d);
state.prevActiveTasks.addAll(taskSet);
state.prevAssignedTasks.addAll(taskSet);
- states.put(i++, state);
+ statesWithPrevTasks.put(i++, state);
}
- assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11), 0, "TaskAssignorTest-TestStickiness");
+ states = copyStates(statesWithPrevTasks);
+ TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0);
for (int client : states.keySet()) {
- Set<Integer> oldActive = states.get(client).prevActiveTasks;
- Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
- Set<Integer> newActive = assignments.get(client).activeTasks;
- Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+ Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+ Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+ Set<Integer> newActive = states.get(client).activeTasks;
+ Set<Integer> newAssigned = states.get(client).assignedTasks;
Set<Integer> intersection = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 7675f9b..4e8a497 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -66,8 +66,10 @@ public class StoreChangeLoggerTest {
}
};
+ @SuppressWarnings("unchecked")
@Test
- public void testAddRemove() {
+ public void testAddRemove() throws Exception {
+
context.setTime(1);
written.put(0, "zero");
changeLogger.add(0);
@@ -75,26 +77,27 @@ public class StoreChangeLoggerTest {
changeLogger.add(1);
written.put(2, "two");
changeLogger.add(2);
- assertEquals(3, changeLogger.numDirty());
- assertEquals(0, changeLogger.numRemoved());
+
+ assertEquals(3, changeLogger.dirty.size());
+ assertEquals(0, changeLogger.removed.size());
changeLogger.delete(0);
changeLogger.delete(1);
written.put(3, "three");
changeLogger.add(3);
- assertEquals(2, changeLogger.numDirty());
- assertEquals(2, changeLogger.numRemoved());
+ assertEquals(2, changeLogger.dirty.size());
+ assertEquals(2, changeLogger.removed.size());
written.put(0, "zero-again");
changeLogger.add(0);
- assertEquals(3, changeLogger.numDirty());
- assertEquals(1, changeLogger.numRemoved());
+ assertEquals(3, changeLogger.dirty.size());
+ assertEquals(1, changeLogger.removed.size());
written.put(4, "four");
changeLogger.add(4);
changeLogger.maybeLogChange(getter);
- assertEquals(0, changeLogger.numDirty());
- assertEquals(0, changeLogger.numRemoved());
+ assertEquals(0, changeLogger.dirty.size());
+ assertEquals(0, changeLogger.removed.size());
assertEquals(5, logged.size());
assertEquals("zero-again", logged.get(0));
assertEquals(null, logged.get(1));
|