kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3890: Streams use same task assignment on cluster rolling restart
Date Wed, 29 Jun 2016 22:19:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 96ae0a592 -> a34f78dca


KAFKA-3890: Streams use same task assignment on cluster rolling restart

Current task assignment in TaskAssignor is not deterministic.

During cluster restart or rolling restart, we have the same set of participating worker nodes.
 But the current TaskAssignor is not able to maintain a deterministic mapping, so about 20%
partitions will be reassigned which would cause state repopulation.
When the topology of work nodes (# of worker nodes, the TaskIds they are carrying with) is
not changed, we really just want to keep the old task assignment.

Add the code to check whether the node topology is changing or not:
- when the prevAssignedTasks from the old clientStates is the same as the new task list
- when there is no new node joining (its prevAssignTasks would be either empty or conflict
with some other nodes)
- when there is no node dropping out (the total of prevAssignedTasks from other nodes would
not be equal to the new task list)

When the topology is not changing, we would just use the old mapping.

I also added the code to check whether the previous assignment is balanced (whether each node's
task list is within [1/2 average -- 2 * average]), if it's not balanced, we will still start
the a new task assignment.

Author: Henry Cai <hcai@pinterest.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1543 from HenryCaiHaiying/upstream


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a34f78dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a34f78dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a34f78dc

Branch: refs/heads/trunk
Commit: a34f78dcad1d7844a21d0afbf0f6eef183847d0d
Parents: 96ae0a5
Author: Henry Cai <hcai@pinterest.com>
Authored: Wed Jun 29 15:19:47 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jun 29 15:19:47 2016 -0700

----------------------------------------------------------------------
 .../internals/assignment/ClientState.java       | 10 ++++++++
 .../internals/assignment/TaskAssignor.java      | 26 ++++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a34f78dc/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index a0f6179..b59af86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -69,4 +69,14 @@ public class ClientState<T> {
         this.cost += cost;
     }
 
+    @Override
+    public String toString() {
+        return "[activeTasks: (" + activeTasks +
+            ") assignedTasks: (" + assignedTasks +
+            ") prevActiveTasks: (" + prevActiveTasks +
+            ") prevAssignedTasks: (" + prevAssignedTasks +
+            ") capacity: " + capacity +
+            " cost: " + cost +
+            "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a34f78dc/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index 2501677..e246c4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -40,10 +40,16 @@ public class TaskAssignor<C, T extends Comparable<T>> {
         }
 
         TaskAssignor<C, T> assignor = new TaskAssignor<>(states, tasks, seed);
+        log.info("Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " +
+            "prevClientsUnchangeed: {}, tasks: {}, replicas: {}",
+            states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged,
+            tasks, numStandbyReplicas);
+
         assignor.assignTasks();
         if (numStandbyReplicas > 0)
             assignor.assignStandbyTasks(numStandbyReplicas);
 
+        log.info("Assigned with: " + assignor.states);
         return assignor.states;
     }
 
@@ -52,13 +58,29 @@ public class TaskAssignor<C, T extends Comparable<T>> {
     private final Set<TaskPair<T>> taskPairs;
     private final int maxNumTaskPairs;
     private final ArrayList<T> tasks;
+    private boolean prevAssignmentBalanced = true;
+    private boolean prevClientsUnchanged = true;
 
     private TaskAssignor(Map<C, ClientState<T>> states, Set<T> tasks, long
randomSeed) {
         this.rand = new Random(randomSeed);
         this.states = new HashMap<>();
+        int avgNumTasks = tasks.size() / states.size();
+        Set<T> existingTasks = new HashSet<>();
         for (Map.Entry<C, ClientState<T>> entry : states.entrySet()) {
             this.states.put(entry.getKey(), entry.getValue().copy());
+            Set<T> oldTasks = entry.getValue().prevAssignedTasks;
+            // make sure the previous assignment is balanced
+            prevAssignmentBalanced = prevAssignmentBalanced &&
+                oldTasks.size() < 2 * avgNumTasks && oldTasks.size() > avgNumTasks
/ 2;
+            for (T task : oldTasks) {
+                // Make sure there is no duplicates
+                prevClientsUnchanged = prevClientsUnchanged && !existingTasks.contains(task);
+            }
+            existingTasks.addAll(oldTasks);
         }
+        // Make sure the existing assignment didn't miss out any task
+        prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
+
         this.tasks = new ArrayList<>(tasks);
 
         int numTasks = tasks.size();
@@ -112,6 +134,10 @@ public class TaskAssignor<C, T extends Comparable<T>> {
         double candidateAdditionCost = 0d;
 
         for (ClientState<T> state : states.values()) {
+            if (prevAssignmentBalanced && prevClientsUnchanged &&
+                state.prevAssignedTasks.contains(task)) {
+                return state;
+            }
             if (!state.assignedTasks.contains(task)) {
                 // if checkTaskPairs flag is on, skip this client if this task doesn't introduce
a new task combination
                 if (checkTaskPairs && !state.assignedTasks.isEmpty() && !hasNewTaskPair(task,
state))


Mime
View raw message