kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3890: Streams use same task assignment on cluster rolling restart
Date Wed, 29 Jun 2016 22:20:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 f630cc79a -> 73e2f090c


KAFKA-3890: Streams use same task assignment on cluster rolling restart

Current task assignment in TaskAssignor is not deterministic.

During cluster restart or rolling restart, we have the same set of participating worker nodes.
 But the current TaskAssignor is not able to maintain a deterministic mapping, so about 20%
partitions will be reassigned which would cause state repopulation.
When the topology of work nodes (# of worker nodes, the TaskIds they are carrying with) is
not changed, we really just want to keep the old task assignment.

Add the code to check whether the node topology is changing or not:
- when the prevAssignedTasks from the old clientStates is the same as the new task list
- when there is no new node joining (its prevAssignTasks would be either empty or conflict
with some other nodes)
- when there is no node dropping out (the total of prevAssignedTasks from other nodes would
not be equal to the new task list)

When the topology is not changing, we would just use the old mapping.

I also added the code to check whether the previous assignment is balanced (whether each node's
task list is within [1/2 average -- 2 * average]), if it's not balanced, we will still start
the a new task assignment.

Author: Henry Cai <hcai@pinterest.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1543 from HenryCaiHaiying/upstream

(cherry picked from commit a34f78dcad1d7844a21d0afbf0f6eef183847d0d)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73e2f090
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73e2f090
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73e2f090

Branch: refs/heads/0.10.0
Commit: 73e2f090c58873d19d0fd8b568d733ec84effb92
Parents: f630cc7
Author: Henry Cai <hcai@pinterest.com>
Authored: Wed Jun 29 15:19:47 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jun 29 15:20:02 2016 -0700

----------------------------------------------------------------------
 .../internals/assignment/ClientState.java       | 10 ++++++++
 .../internals/assignment/TaskAssignor.java      | 26 ++++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/73e2f090/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index a0f6179..b59af86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -69,4 +69,14 @@ public class ClientState<T> {
         this.cost += cost;
     }
 
+    @Override
+    public String toString() {
+        return "[activeTasks: (" + activeTasks +
+            ") assignedTasks: (" + assignedTasks +
+            ") prevActiveTasks: (" + prevActiveTasks +
+            ") prevAssignedTasks: (" + prevAssignedTasks +
+            ") capacity: " + capacity +
+            " cost: " + cost +
+            "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73e2f090/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index 2501677..e246c4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -40,10 +40,16 @@ public class TaskAssignor<C, T extends Comparable<T>> {
         }
 
         TaskAssignor<C, T> assignor = new TaskAssignor<>(states, tasks, seed);
+        log.info("Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " +
+            "prevClientsUnchangeed: {}, tasks: {}, replicas: {}",
+            states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged,
+            tasks, numStandbyReplicas);
+
         assignor.assignTasks();
         if (numStandbyReplicas > 0)
             assignor.assignStandbyTasks(numStandbyReplicas);
 
+        log.info("Assigned with: " + assignor.states);
         return assignor.states;
     }
 
@@ -52,13 +58,29 @@ public class TaskAssignor<C, T extends Comparable<T>> {
     private final Set<TaskPair<T>> taskPairs;
     private final int maxNumTaskPairs;
     private final ArrayList<T> tasks;
+    private boolean prevAssignmentBalanced = true;
+    private boolean prevClientsUnchanged = true;
 
     private TaskAssignor(Map<C, ClientState<T>> states, Set<T> tasks, long
randomSeed) {
         this.rand = new Random(randomSeed);
         this.states = new HashMap<>();
+        int avgNumTasks = tasks.size() / states.size();
+        Set<T> existingTasks = new HashSet<>();
         for (Map.Entry<C, ClientState<T>> entry : states.entrySet()) {
             this.states.put(entry.getKey(), entry.getValue().copy());
+            Set<T> oldTasks = entry.getValue().prevAssignedTasks;
+            // make sure the previous assignment is balanced
+            prevAssignmentBalanced = prevAssignmentBalanced &&
+                oldTasks.size() < 2 * avgNumTasks && oldTasks.size() > avgNumTasks
/ 2;
+            for (T task : oldTasks) {
+                // Make sure there is no duplicates
+                prevClientsUnchanged = prevClientsUnchanged && !existingTasks.contains(task);
+            }
+            existingTasks.addAll(oldTasks);
         }
+        // Make sure the existing assignment didn't miss out any task
+        prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
+
         this.tasks = new ArrayList<>(tasks);
 
         int numTasks = tasks.size();
@@ -112,6 +134,10 @@ public class TaskAssignor<C, T extends Comparable<T>> {
         double candidateAdditionCost = 0d;
 
         for (ClientState<T> state : states.values()) {
+            if (prevAssignmentBalanced && prevClientsUnchanged &&
+                state.prevAssignedTasks.contains(task)) {
+                return state;
+            }
             if (!state.assignedTasks.contains(task)) {
                 // if checkTaskPairs flag is on, skip this client if this task doesn't introduce
a new task combination
                 if (checkTaskPairs && !state.assignedTasks.isEmpty() && !hasNewTaskPair(task,
state))


Mime
View raw message