kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-4117: Stream partitionassignro cleanup
Date Sat, 29 Oct 2016 17:48:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0dd9607f9 -> a4ab9d02a


http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e46a016..cd9b7a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -90,13 +90,13 @@ public class StreamPartitionAssignorTest {
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
     private final TaskId task3 = new TaskId(0, 3);
-    private String userEndPoint = null;
+    private String userEndPoint = "localhost:2171";
 
     private Properties configProps() {
         return new Properties() {
             {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
-                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
@@ -297,12 +297,12 @@ public class StreamPartitionAssignorTest {
         TaskId task10 = new TaskId(1, 0);
         TaskId task11 = new TaskId(1, 1);
         TaskId task12 = new TaskId(1, 2);
+        List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12);
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-
         StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
@@ -323,14 +323,43 @@ public class StreamPartitionAssignorTest {
         assertEquals(2, assignments.get("consumer11").partitions().size());
         assertEquals(2, assignments.get("consumer20").partitions().size());
 
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
+        AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+        assertEquals(2, info10.activeTasks.size());
+        assertEquals(2, info11.activeTasks.size());
+        assertEquals(2, info20.activeTasks.size());
+
+        Set<TaskId> allTasks = new HashSet<>();
+        allTasks.addAll(info10.activeTasks);
+        allTasks.addAll(info11.activeTasks);
+        allTasks.addAll(info20.activeTasks);
+        assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
-        assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
-        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
-        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
+        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
+
+        assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1",
tasks, topicGroups));
+        assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2",
tasks, topicGroups));
+        assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3",
tasks, topicGroups));
+    }
+
+    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId>
tasks, Map<Integer, TopologyBuilder.TopicsInfo> topicGroups) {
+        final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId,
storeName);
+
+        Set<TaskId> ids = new HashSet<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet())
{
+            Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
+
+            if (stateChangelogTopics.contains(changelogTopic)) {
+                for (TaskId id : tasks) {
+                    if (id.topicGroupId == entry.getKey())
+                        ids.add(id);
+                }
+            }
+        }
+        return ids;
     }
 
     @Test
@@ -406,48 +435,6 @@ public class StreamPartitionAssignorTest {
         assertEquals(allTasks, allStandbyTasks);
     }
 
-    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
-
-        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic
group.
-
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-
-        // check if the number of assigned partitions == the size of active task id list
-        assertEquals(assignment.partitions().size(), info.activeTasks.size());
-
-        // check if active tasks are consistent
-        List<TaskId> activeTasks = new ArrayList<>();
-        Set<String> activeTopics = new HashSet<>();
-        for (TopicPartition partition : assignment.partitions()) {
-            // since default grouper, taskid.partition == partition.partition()
-            activeTasks.add(new TaskId(0, partition.partition()));
-            activeTopics.add(partition.topic());
-        }
-        assertEquals(activeTasks, info.activeTasks);
-
-        // check if active partitions cover all topics
-        assertEquals(allTopics, activeTopics);
-
-        // check if standby tasks are consistent
-        Set<String> standbyTopics = new HashSet<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet())
{
-            TaskId id = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
-            for (TopicPartition partition : partitions) {
-                // since default grouper, taskid.partition == partition.partition()
-                assertEquals(id.partition, partition.partition());
-
-                standbyTopics.add(partition.topic());
-            }
-        }
-
-        if (info.standbyTasks.size() > 0)
-            // check if standby partitions cover all topics
-            assertEquals(allTopics, standbyTopics);
-
-        return info;
-    }
-
     @Test
     public void testOnAssignment() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
@@ -468,16 +455,18 @@ public class StreamPartitionAssignorTest {
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
 
         List<TaskId> activeTaskList = Utils.mkList(task0, task3);
+        Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-        standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
-        standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+        activeTasks.put(task0, Utils.mkSet(t1p0));
+        activeTasks.put(task3, Utils.mkSet(t2p3));
+        standbyTasks.put(task1, Utils.mkSet(t1p0));
+        standbyTasks.put(task2, Utils.mkSet(t2p0));
 
         AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo,
Set<TopicPartition>>());
         PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0,
t2p3), info.encode());
         partitionAssignor.onAssignment(assignment);
 
-        assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
-        assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
+        assertEquals(activeTasks, partitionAssignor.activeTasks());
         assertEquals(standbyTasks, partitionAssignor.standbyTasks());
     }
 
@@ -621,7 +610,7 @@ public class StreamPartitionAssignorTest {
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata,
subscriptions);
         final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
-        final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHostState.get(new
HostInfo("localhost", 8080));
+        final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new
HostInfo("localhost", 8080));
         assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
                 new TopicPartition("topic1", 1),
                 new TopicPartition("topic1", 2)), topicPartitions);
@@ -725,13 +714,54 @@ public class StreamPartitionAssignorTest {
 
     }
 
+    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic
group.
+
+        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+        // check if the number of assigned partitions == the size of active task id list
+        assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+        // check if active tasks are consistent
+        List<TaskId> activeTasks = new ArrayList<>();
+        Set<String> activeTopics = new HashSet<>();
+        for (TopicPartition partition : assignment.partitions()) {
+            // since default grouper, taskid.partition == partition.partition()
+            activeTasks.add(new TaskId(0, partition.partition()));
+            activeTopics.add(partition.topic());
+        }
+        assertEquals(activeTasks, info.activeTasks);
+
+        // check if active partitions cover all topics
+        assertEquals(allTopics, activeTopics);
+
+        // check if standby tasks are consistent
+        Set<String> standbyTopics = new HashSet<>();
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet())
{
+            TaskId id = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+            for (TopicPartition partition : partitions) {
+                // since default grouper, taskid.partition == partition.partition()
+                assertEquals(id.partition, partition.partition());
+
+                standbyTopics.add(partition.topic());
+            }
+        }
+
+        if (info.standbyTasks.size() > 0)
+            // check if standby partitions cover all topics
+            assertEquals(allTopics, standbyTopics);
+
+        return info;
+    }
 
     private class MockInternalTopicManager extends InternalTopicManager {
 
-        public Map<String, Integer> readyTopics = new HashMap<>();
-        public MockConsumer<byte[], byte[]> restoreConsumer;
+        Map<String, Integer> readyTopics = new HashMap<>();
+        MockConsumer<byte[], byte[]> restoreConsumer;
 
-        public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer)
{
+        MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
             super();
 
             this.restoreConsumer = restoreConsumer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 411e02d..f4772f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -109,7 +109,7 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-three", 0, null, null, null),
                 new PartitionInfo("topic-four", 0, null, null, null));
 
-        cluster = new Cluster(Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet());
+        cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos,
Collections.<String>emptySet(), Collections.<String>emptySet());
         discovery = new StreamsMetadataState(builder);
         discovery.onChange(hostToPartitions, cluster);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ce94a23..cfa0e61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -64,7 +64,7 @@ public class AssignmentInfoTest {
         final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
         assertEquals(oldVersion.activeTasks, decoded.activeTasks);
         assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
-        assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't
in V1
+        assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in
V1
         assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
index 4333087..52ca0a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
@@ -33,23 +33,34 @@ import static org.junit.Assert.assertTrue;
 
 public class TaskAssignorTest {
 
+    private static Map<Integer, ClientState<Integer>> copyStates(Map<Integer,
ClientState<Integer>> states) {
+        Map<Integer, ClientState<Integer>> copy = new HashMap<>();
+        for (Map.Entry<Integer, ClientState<Integer>> entry : states.entrySet())
{
+            copy.put(entry.getKey(), entry.getValue().copy());
+        }
+
+        return copy;
+    }
+
     @Test
     public void testAssignWithoutStandby() {
-        HashMap<Integer, ClientState<Integer>> states = new HashMap<>();
+        HashMap<Integer, ClientState<Integer>> statesWithNoPrevTasks = new HashMap<>();
         for (int i = 0; i < 6; i++) {
-            states.put(i, new ClientState<Integer>(1d));
+            statesWithNoPrevTasks.put(i, new ClientState<Integer>(1d));
         }
         Set<Integer> tasks;
-        Map<Integer, ClientState<Integer>> assignments;
         int numActiveTasks;
         int numAssignedTasks;
 
+        Map<Integer, ClientState<Integer>> states;
+
         // # of clients and # of tasks are equal.
+        states = copyStates(statesWithNoPrevTasks);
         tasks = mkSet(0, 1, 2, 3, 4, 5);
-        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+        TaskAssignor.assign(states, tasks, 0);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertEquals(1, assignment.activeTasks.size());
@@ -60,10 +71,11 @@ public class TaskAssignorTest {
 
         // # of clients < # of tasks
         tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
-        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 0);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(1 <= assignment.activeTasks.size());
@@ -76,10 +88,11 @@ public class TaskAssignorTest {
 
         // # of clients > # of tasks
         tasks = mkSet(0, 1, 2, 3);
-        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 0);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -93,12 +106,12 @@ public class TaskAssignorTest {
 
     @Test
     public void testAssignWithStandby() {
-        HashMap<Integer, ClientState<Integer>> states = new HashMap<>();
+        HashMap<Integer, ClientState<Integer>> statesWithNoPrevTasks = new HashMap<>();
         for (int i = 0; i < 6; i++) {
-            states.put(i, new ClientState<Integer>(1d));
+            statesWithNoPrevTasks.put(i, new ClientState<Integer>(1d));
         }
         Set<Integer> tasks;
-        Map<Integer, ClientState<Integer>> assignments;
+        Map<Integer, ClientState<Integer>> states;
         int numActiveTasks;
         int numAssignedTasks;
 
@@ -108,8 +121,9 @@ public class TaskAssignorTest {
         // 1 standby replicas.
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
-        for (ClientState<Integer> assignment : assignments.values()) {
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 1);
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertEquals(1, assignment.activeTasks.size());
@@ -122,10 +136,11 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 1);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(1 <= assignment.activeTasks.size());
@@ -140,10 +155,11 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1, 2, 3);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 1);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -158,10 +174,11 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 1);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -173,10 +190,11 @@ public class TaskAssignorTest {
         assertEquals(tasks.size() * 2, numAssignedTasks);
 
         // 2 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 2, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 2);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -187,10 +205,11 @@ public class TaskAssignorTest {
         assertEquals(tasks.size() * 3, numAssignedTasks);
 
         // 3 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 3, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 3);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -205,27 +224,29 @@ public class TaskAssignorTest {
     @Test
     public void testStickiness() {
         List<Integer> tasks;
-        Map<Integer, ClientState<Integer>> states;
+        Map<Integer, ClientState<Integer>> statesWithPrevTasks;
         Map<Integer, ClientState<Integer>> assignments;
         int i;
 
         // # of clients and # of tasks are equal.
+        Map<Integer, ClientState<Integer>> states;
         tasks = mkList(0, 1, 2, 3, 4, 5);
         Collections.shuffle(tasks);
-        states = new HashMap<>();
+        statesWithPrevTasks = new HashMap<>();
         i = 0;
         for (int task : tasks) {
             ClientState<Integer> state = new ClientState<>(1d);
             state.prevActiveTasks.add(task);
             state.prevAssignedTasks.add(task);
-            states.put(i++, state);
+            statesWithPrevTasks.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0, "TaskAssignorTest-TestStickiness");
+        states = copyStates(statesWithPrevTasks);
+        TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0);
         for (int client : states.keySet()) {
-            Set<Integer> oldActive = states.get(client).prevActiveTasks;
-            Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
-            Set<Integer> newActive = assignments.get(client).activeTasks;
-            Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+            Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+            Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+            Set<Integer> newActive = states.get(client).activeTasks;
+            Set<Integer> newAssigned = states.get(client).assignedTasks;
 
             assertEquals(oldActive, newActive);
             assertEquals(oldAssigned, newAssigned);
@@ -234,7 +255,7 @@ public class TaskAssignorTest {
         // # of clients > # of tasks
         tasks = mkList(0, 1, 2, 3, -1, -1);
         Collections.shuffle(tasks);
-        states = new HashMap<>();
+        statesWithPrevTasks = new HashMap<>();
         i = 0;
         for (int task : tasks) {
             ClientState<Integer> state = new ClientState<>(1d);
@@ -242,14 +263,15 @@ public class TaskAssignorTest {
                 state.prevActiveTasks.add(task);
                 state.prevAssignedTasks.add(task);
             }
-            states.put(i++, state);
+            statesWithPrevTasks.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0, "TaskAssignorTest-TestStickiness");
+        states = copyStates(statesWithPrevTasks);
+        TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0);
         for (int client : states.keySet()) {
-            Set<Integer> oldActive = states.get(client).prevActiveTasks;
-            Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
-            Set<Integer> newActive = assignments.get(client).activeTasks;
-            Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+            Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+            Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+            Set<Integer> newActive = states.get(client).activeTasks;
+            Set<Integer> newAssigned = states.get(client).assignedTasks;
 
             assertEquals(oldActive, newActive);
             assertEquals(oldAssigned, newAssigned);
@@ -258,20 +280,21 @@ public class TaskAssignorTest {
         // # of clients < # of tasks
         List<Set<Integer>> taskSets = mkList(mkSet(0, 1), mkSet(2, 3), mkSet(4,
5), mkSet(6, 7), mkSet(8, 9), mkSet(10, 11));
         Collections.shuffle(taskSets);
-        states = new HashMap<>();
+        statesWithPrevTasks = new HashMap<>();
         i = 0;
         for (Set<Integer> taskSet : taskSets) {
             ClientState<Integer> state = new ClientState<>(1d);
             state.prevActiveTasks.addAll(taskSet);
             state.prevAssignedTasks.addAll(taskSet);
-            states.put(i++, state);
+            statesWithPrevTasks.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11), 0, "TaskAssignorTest-TestStickiness");
+        states = copyStates(statesWithPrevTasks);
+        TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0);
         for (int client : states.keySet()) {
-            Set<Integer> oldActive = states.get(client).prevActiveTasks;
-            Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
-            Set<Integer> newActive = assignments.get(client).activeTasks;
-            Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+            Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+            Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+            Set<Integer> newActive = states.get(client).activeTasks;
+            Set<Integer> newAssigned = states.get(client).assignedTasks;
 
             Set<Integer> intersection = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 7675f9b..4e8a497 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -66,8 +66,10 @@ public class StoreChangeLoggerTest {
         }
     };
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void testAddRemove() {
+    public void testAddRemove() throws Exception {
+
         context.setTime(1);
         written.put(0, "zero");
         changeLogger.add(0);
@@ -75,26 +77,27 @@ public class StoreChangeLoggerTest {
         changeLogger.add(1);
         written.put(2, "two");
         changeLogger.add(2);
-        assertEquals(3, changeLogger.numDirty());
-        assertEquals(0, changeLogger.numRemoved());
+
+        assertEquals(3, changeLogger.dirty.size());
+        assertEquals(0, changeLogger.removed.size());
 
         changeLogger.delete(0);
         changeLogger.delete(1);
         written.put(3, "three");
         changeLogger.add(3);
-        assertEquals(2, changeLogger.numDirty());
-        assertEquals(2, changeLogger.numRemoved());
+        assertEquals(2, changeLogger.dirty.size());
+        assertEquals(2, changeLogger.removed.size());
 
         written.put(0, "zero-again");
         changeLogger.add(0);
-        assertEquals(3, changeLogger.numDirty());
-        assertEquals(1, changeLogger.numRemoved());
+        assertEquals(3, changeLogger.dirty.size());
+        assertEquals(1, changeLogger.removed.size());
 
         written.put(4, "four");
         changeLogger.add(4);
         changeLogger.maybeLogChange(getter);
-        assertEquals(0, changeLogger.numDirty());
-        assertEquals(0, changeLogger.numRemoved());
+        assertEquals(0, changeLogger.dirty.size());
+        assertEquals(0, changeLogger.removed.size());
         assertEquals(5, logged.size());
         assertEquals("zero-again", logged.get(0));
         assertEquals(null, logged.get(1));


Mime
View raw message