kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7144: Fix task assignment to be even (#5390)
Date Wed, 01 Aug 2018 16:48:46 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 81a5e22  KAFKA-7144: Fix task assignment to be even (#5390)
81a5e22 is described below

commit 81a5e223f2f354456861ce4d7fe3cb4c6b019fa0
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Wed Jul 25 01:00:18 2018 -0400

    KAFKA-7144: Fix task assignment to be even (#5390)
    
    This PR now justs removes the check in TaskPairs.hasNewPair that was causing the task
assignment issue.
    
    This was done as we need to further refine task assignment strategy and this approach
needs to include the statefulness of tasks and is best done in one pass vs taking a "patchy"
approach.
    
    Updated current tests and ran locally
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../internals/assignment/StickyTaskAssignor.java      |  2 +-
 .../internals/assignment/StickyTaskAssignorTest.java  | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 5b54d08..8767d0f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -270,7 +270,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
                 if (!active && !pairs.contains(pair(task1, taskId))) {
                     return true;
                 }
-                if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId !=
taskId.topicGroupId) {
+                if (!pairs.contains(pair(task1, taskId))) {
                     return true;
                 }
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index ed22e3c..d431dbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -152,6 +152,25 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
+    public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
+
+        createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task03,
+                                                            task04, task05, task10);
+
+        createClient(p2, 1);
+
+        final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, task00, task01,
task02, task03, task04, task05);
+
+        final Set<TaskId> expectedClientITasks = new HashSet<>(Arrays.asList(task00,
task01, task10, task05));
+        final Set<TaskId> expectedClientIITasks = new HashSet<>(Arrays.asList(task02,
task03, task04));
+
+        taskAssignor.assign(0);
+
+        assertThat(clients.get(p1).activeTasks(), equalTo(expectedClientITasks));
+        assertThat(clients.get(p2).activeTasks(), equalTo(expectedClientIITasks));
+    }
+
+    @Test
     public void shouldKeepActiveTaskStickynessWhenMoreClientThanActiveTasks() {
         final int p5 = 5;
         createClientWithPreviousActiveTasks(p1, 1, task00);


Mime
View raw message