kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4677 Follow Up: add optimization to StickyTaskAssignor for rolling rebounce
Date Wed, 01 Mar 2017 19:21:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c9872cb21 -> 0fba52960


KAFKA-4677 Follow Up: add optimization to StickyTaskAssignor for rolling rebounce

Detect when a rebalance has happened due to one or more existing nodes bouncing. Keep assignment
of previous active tasks the same and only assign the tasks that were not active to the new
clients.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2609 from dguy/kstreams-575


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0fba5296
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0fba5296
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0fba5296

Branch: refs/heads/trunk
Commit: 0fba529608a5eb829feb66a499c89ead40b79694
Parents: c9872cb
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Mar 1 11:21:41 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 1 11:21:41 2017 -0800

----------------------------------------------------------------------
 .../internals/assignment/ClientState.java       |   4 +
 .../assignment/StickyTaskAssignor.java          |  71 +++++++----
 .../internals/assignment/ClientStateTest.java   |  13 ++
 .../assignment/StickyTaskAssignorTest.java      | 121 ++++++++++++++++++-
 4 files changed, 182 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 3e9a521..d5f8ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -150,4 +150,8 @@ public class ClientState<T> {
     int capacity() {
         return capacity;
     }
+
+    boolean hasUnfulfilledQuota(final int tasksPerThread) {
+        return activeTasks.size() < capacity * tasksPerThread;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index f06ecae..81c9305 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -35,16 +36,12 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
     private final Map<TaskId, ID> previousActiveTaskAssignment = new HashMap<>();
     private final Map<TaskId, Set<ID>> previousStandbyTaskAssignment = new HashMap<>();
     private final TaskPairs taskPairs;
-    private final int availableCapacity;
-    private final boolean hasNewTasks;
 
     public StickyTaskAssignor(final Map<ID, ClientState<TaskId>> clients, final
Set<TaskId> taskIds) {
         this.clients = clients;
         this.taskIds = taskIds;
-        this.availableCapacity = sumCapacity(clients.values());
         taskPairs = new TaskPairs(taskIds.size() * (taskIds.size() - 1) / 2);
         mapPreviousTaskAssignment(clients);
-        this.hasNewTasks = !previousActiveTaskAssignment.keySet().containsAll(taskIds);
     }
 
     @Override
@@ -66,35 +63,69 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
                              numStandbyReplicas, taskId);
                     break;
                 }
-                assign(taskId, ids, false);
+                allocateTaskWithClientCandidates(taskId, ids, false);
             }
         }
     }
 
     private void assignActive() {
-        final Set<TaskId> previouslyAssignedTaskIds = new HashSet<>(previousActiveTaskAssignment.keySet());
-        previouslyAssignedTaskIds.addAll(previousStandbyTaskAssignment.keySet());
-        previouslyAssignedTaskIds.retainAll(taskIds);
-
-        // assign previously assigned tasks first
-        for (final TaskId taskId : previouslyAssignedTaskIds) {
-            assign(taskId, clients.keySet(), true);
+        final int totalCapacity = sumCapacity(clients.values());
+        final int tasksPerThread = taskIds.size() / totalCapacity;
+        final Set<TaskId> assigned = new HashSet<>();
+
+        // first try and re-assign existing active tasks to clients that previously had
+        // the same active task
+        for (final Map.Entry<TaskId, ID> entry : previousActiveTaskAssignment.entrySet())
{
+            final TaskId taskId = entry.getKey();
+            if (taskIds.contains(taskId)) {
+                final ClientState<TaskId> client = clients.get(entry.getValue());
+                if (client.hasUnfulfilledQuota(tasksPerThread)) {
+                    assignTaskToClient(assigned, taskId, client);
+                }
+            }
         }
 
-        final Set<TaskId> newTasks  = new HashSet<>(taskIds);
-        newTasks.removeAll(previouslyAssignedTaskIds);
+        final Set<TaskId> unassigned = new HashSet<>(taskIds);
+        unassigned.removeAll(assigned);
+
+        // try and assign any remaining unassigned tasks to clients that previously
+        // have seen the task.
+        for (final Iterator<TaskId> iterator = unassigned.iterator(); iterator.hasNext();
) {
+            final TaskId taskId = iterator.next();
+            final Set<ID> clientIds = previousStandbyTaskAssignment.get(taskId);
+            if (clientIds != null) {
+                for (final ID clientId : clientIds) {
+                    final ClientState<TaskId> client = clients.get(clientId);
+                    if (client.hasUnfulfilledQuota(tasksPerThread)) {
+                        assignTaskToClient(assigned, taskId, client);
+                        iterator.remove();
+                        break;
+                    }
+                }
+            }
+        }
 
-        for (final TaskId taskId : newTasks) {
-            assign(taskId, clients.keySet(), true);
+        // assign any remaining unassigned tasks
+        for (final TaskId taskId : unassigned) {
+            allocateTaskWithClientCandidates(taskId, clients.keySet(), true);
         }
+
     }
 
-    private void assign(final TaskId taskId, final Set<ID> clientsWithin, final boolean
active) {
+
+
+    private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID>
clientsWithin, final boolean active) {
         final ClientState<TaskId> client = findClient(taskId, clientsWithin);
         taskPairs.addPairs(taskId, client.assignedTasks());
         client.assign(taskId, active);
     }
 
+    private void assignTaskToClient(final Set<TaskId> assigned, final TaskId taskId,
final ClientState<TaskId> client) {
+        taskPairs.addPairs(taskId, client.assignedTasks());
+        client.assign(taskId, true);
+        assigned.add(taskId);
+    }
+
     private Set<ID> findClientsWithoutAssignedTask(final TaskId taskId) {
         final Set<ID> clientIds = new HashSet<>();
         for (final Map.Entry<ID, ClientState<TaskId>> client : clients.entrySet())
{
@@ -131,9 +162,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
     }
 
     private boolean shouldBalanceLoad(final ClientState<TaskId> client) {
-        return !hasNewTasks
-                && client.reachedCapacity()
-                && hasClientsWithMoreAvailableCapacity(client);
+        return client.reachedCapacity() && hasClientsWithMoreAvailableCapacity(client);
     }
 
     private boolean hasClientsWithMoreAvailableCapacity(final ClientState<TaskId> client)
{
@@ -208,6 +237,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
                 previousStandbyTaskAssignment.get(prevAssignedTask).add(clientState.getKey());
             }
         }
+
     }
 
     private int sumCapacity(final Collection<ClientState<TaskId>> values) {
@@ -218,7 +248,6 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
         return capacity;
     }
 
-
     private static class TaskPairs {
         private final Set<Pair> pairs;
         private final int maxPairs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index 6692844..af2c9e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -146,5 +146,18 @@ public class ClientStateTest {
         c1.hasMoreAvailableCapacityThan(new ClientState<Integer>(0));
     }
 
+    @Test
+    public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread()
throws Exception {
+        final ClientState<Integer> client = new ClientState<>(1);
+        client.assign(1, true);
+        assertTrue(client.hasUnfulfilledQuota(2));
+    }
+
+    @Test
+    public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread()
throws Exception {
+        final ClientState<Integer> client = new ClientState<>(1);
+        client.assign(1, true);
+        assertFalse(client.hasUnfulfilledQuota(1));
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index a782ea3..f37bf7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -31,6 +31,7 @@ import java.util.TreeMap;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.core.IsCollectionContaining.hasItems;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertTrue;
@@ -41,6 +42,8 @@ public class StickyTaskAssignorTest {
     private final TaskId task01 = new TaskId(0, 1);
     private final TaskId task02 = new TaskId(0, 2);
     private final TaskId task03 = new TaskId(0, 3);
+    private final TaskId task04 = new TaskId(0, 4);
+    private final TaskId task05 = new TaskId(0, 5);
     private final Map<Integer, ClientState<TaskId>> clients = new TreeMap<>();
     private final Integer p1 = 1;
     private final Integer p2 = 2;
@@ -448,9 +451,6 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldNotMoveAnyTasksWhenNewTasksAdded() throws Exception {
-        final TaskId task04 = new TaskId(0, 4);
-        final TaskId task05 = new TaskId(0, 5);
-
         createClientWithPreviousActiveTasks(p1, 1, task00, task01);
         createClientWithPreviousActiveTasks(p2, 1, task02, task03);
 
@@ -463,8 +463,6 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() throws
Exception {
-        final TaskId task04 = new TaskId(0, 4);
-        final TaskId task05 = new TaskId(0, 5);
 
         createClientWithPreviousActiveTasks(p1, 1, task02, task01);
         createClientWithPreviousActiveTasks(p2, 1, task00, task03);
@@ -478,9 +476,120 @@ public class StickyTaskAssignorTest {
         assertThat(clients.get(p3).activeTasks(), hasItems(task04, task05));
     }
 
+    @Test
+    public void shouldAssignTasksNotPreviouslyActiveToNewClient() throws Exception {
+        final TaskId task10 = new TaskId(0, 10);
+        final TaskId task11 = new TaskId(0, 11);
+        final TaskId task12 = new TaskId(1, 2);
+        final TaskId task13 = new TaskId(1, 3);
+        final TaskId task20 = new TaskId(2, 0);
+        final TaskId task21 = new TaskId(2, 1);
+        final TaskId task22 = new TaskId(2, 2);
+        final TaskId task23 = new TaskId(2, 3);
+
+        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task01,
task12, task13);
+        c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23));
+        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task00,
task11, task22);
+        c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12,
task21, task13, task23));
+        final ClientState<TaskId> c3 = createClientWithPreviousActiveTasks(p3, 1, task20,
task21, task23);
+        c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12));
+
+        final ClientState<TaskId> newClient = createClient(p4, 1);
+        newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, task02, task11,
task20, task03, task12, task21, task13, task22, task23));
+
+        final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23);
+        taskAssignor.assign(0);
+
+        assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task01, task12, task13)));
+        assertThat(c2.activeTasks(), equalTo(Utils.mkSet(task00, task11, task22)));
+        assertThat(c3.activeTasks(), equalTo(Utils.mkSet(task20, task21, task23)));
+        assertThat(newClient.activeTasks(), equalTo(Utils.mkSet(task02, task03, task10)));
+    }
+
+    @Test
+    public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() throws Exception
{
+        final TaskId task10 = new TaskId(0, 10);
+        final TaskId task11 = new TaskId(0, 11);
+        final TaskId task12 = new TaskId(1, 2);
+        final TaskId task13 = new TaskId(1, 3);
+        final TaskId task20 = new TaskId(2, 0);
+        final TaskId task21 = new TaskId(2, 1);
+        final TaskId task22 = new TaskId(2, 2);
+        final TaskId task23 = new TaskId(2, 3);
+
+        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task01,
task12, task13);
+        c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23));
+        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task00,
task11, task22);
+        c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12,
task21, task13, task23));
+
+        final ClientState<TaskId> bounce1 = createClient(p3, 1);
+        bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23));
+
+        final ClientState<TaskId> bounce2 = createClient(p4, 1);
+        bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10));
+
+        final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23);
+        taskAssignor.assign(0);
+
+        assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task01, task12, task13)));
+        assertThat(c2.activeTasks(), equalTo(Utils.mkSet(task00, task11, task22)));
+        assertThat(bounce1.activeTasks(), equalTo(Utils.mkSet(task20, task21, task23)));
+        assertThat(bounce2.activeTasks(), equalTo(Utils.mkSet(task02, task03, task10)));
+    }
+
+    @Test
+    public void shouldAssignTasksToNewClient() throws Exception {
+        createClientWithPreviousActiveTasks(p1, 1, task01, task02);
+        createClient(p2, 1);
+        createTaskAssignor(task01, task02).assign(0);
+        assertThat(clients.get(p1).activeTaskCount(), equalTo(1));
+    }
+
+    @Test
+    public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients()
throws Exception {
+        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task00,
task01, task02);
+        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task03,
task04, task05);
+        final ClientState<TaskId> newClient = createClient(p3, 1);
+
+        final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task01, task02, task03, task04, task05);
+        taskAssignor.assign(0);
+        assertThat(c1.activeTasks(), not(hasItem(task03)));
+        assertThat(c1.activeTasks(), not(hasItem(task04)));
+        assertThat(c1.activeTasks(), not(hasItem(task05)));
+        assertThat(c1.activeTaskCount(), equalTo(2));
+        assertThat(c2.activeTasks(), not(hasItems(task00)));
+        assertThat(c2.activeTasks(), not(hasItems(task01)));
+        assertThat(c2.activeTasks(), not(hasItems(task02)));
+        assertThat(c2.activeTaskCount(), equalTo(2));
+        assertThat(newClient.activeTaskCount(), equalTo(2));
+    }
+
+    @Test
+    public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients()
throws Exception {
+        final TaskId task06 = new TaskId(0, 6);
+        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task00,
task01, task02, task06);
+        final ClientState<TaskId> c2 = createClient(p2, 1);
+        c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05));
+        final ClientState<TaskId> newClient = createClient(p3, 1);
+
+        final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task01, task02, task03, task04, task05, task06);
+        taskAssignor.assign(0);
+        assertThat(c1.activeTasks(), not(hasItem(task03)));
+        assertThat(c1.activeTasks(), not(hasItem(task04)));
+        assertThat(c1.activeTasks(), not(hasItem(task05)));
+        assertThat(c1.activeTaskCount(), equalTo(3));
+        assertThat(c2.activeTasks(), not(hasItems(task00)));
+        assertThat(c2.activeTasks(), not(hasItems(task01)));
+        assertThat(c2.activeTasks(), not(hasItems(task02)));
+        assertThat(c2.activeTaskCount(), equalTo(2));
+        assertThat(newClient.activeTaskCount(), equalTo(2));
+    }
+
     private StickyTaskAssignor<Integer> createTaskAssignor(final TaskId... tasks) {
+        final List<TaskId> taskIds = Arrays.asList(tasks);
+        Collections.shuffle(taskIds);
         return new StickyTaskAssignor<>(clients,
-                                        new HashSet<>(Arrays.asList(tasks)));
+                                        new HashSet<>(taskIds));
     }
 
     private List<TaskId> allActiveTasks() {


Mime
View raw message