kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7144: Fix task assignment to be even (#5390)
Date Wed, 25 Jul 2018 05:00:24 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d9a427  KAFKA-7144: Fix task assignment to be even (#5390)
1d9a427 is described below

commit 1d9a427225c64e7629a4eb2e2d129d5551185049
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Wed Jul 25 01:00:18 2018 -0400

    KAFKA-7144: Fix task assignment to be even (#5390)
    
    This PR now justs removes the check in TaskPairs.hasNewPair that was causing the task
assignment issue.
    
    This was done as we need to further refine task assignment strategy and this approach
needs to include the statefulness of tasks and is best done in one pass vs taking a "patchy"
approach.
    
    Updated current tests and ran locally
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../internals/assignment/StickyTaskAssignor.java      |  2 +-
 .../internals/assignment/StickyTaskAssignorTest.java  | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 5b54d08..8767d0f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -270,7 +270,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
                 if (!active && !pairs.contains(pair(task1, taskId))) {
                     return true;
                 }
-                if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId !=
taskId.topicGroupId) {
+                if (!pairs.contains(pair(task1, taskId))) {
                     return true;
                 }
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index ed22e3c..d431dbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -152,6 +152,25 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
+    public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
+
+        createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task03,
+                                                            task04, task05, task10);
+
+        createClient(p2, 1);
+
+        final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, task00, task01,
task02, task03, task04, task05);
+
+        final Set<TaskId> expectedClientITasks = new HashSet<>(Arrays.asList(task00,
task01, task10, task05));
+        final Set<TaskId> expectedClientIITasks = new HashSet<>(Arrays.asList(task02,
task03, task04));
+
+        taskAssignor.assign(0);
+
+        assertThat(clients.get(p1).activeTasks(), equalTo(expectedClientITasks));
+        assertThat(clients.get(p2).activeTasks(), equalTo(expectedClientIITasks));
+    }
+
+    @Test
     public void shouldKeepActiveTaskStickynessWhenMoreClientThanActiveTasks() {
         final int p5 = 5;
         createClientWithPreviousActiveTasks(p1, 1, task00);


Mime
View raw message