kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group (#8805)
Date Tue, 09 Jun 2020 17:18:41 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new ea14a20  KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group (#8805)
ea14a20 is described below

commit ea14a20febfdfa5d1139545c32ccbff872cdc296
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Tue Jun 9 09:41:11 2020 -0700

    KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group (#8805)
    
    In the first version of the incremental cooperative protocol, in the presence of a failed sync request by the leader, the assignor was designed to treat the unapplied assignments as lost and trigger a rebalance delay.
    
    This commit applies optimizations in these cases to avoid the unnecessary activation of the rebalancing delay. First, if the worker that loses the sync group request or response is the leader, then it detects this failure by checking the what is the expected generation when it performs task assignments. If it's not the expected one, it resets its view of the previous assignment because it wasn't successfully applied and it doesn't represent a correct state. Furthermore, if the worker  [...]
    
    Existing unit tests and integration tests are adapted to test the proposed optimizations.
    
    Reviewers: Randall Hauch <rhauch@gmail.com>
---
 checkstyle/suppressions.xml                        |   6 +-
 .../consumer/internals/AbstractCoordinator.java    |   4 +-
 .../runtime/distributed/DistributedConfig.java     |   2 +-
 .../IncrementalCooperativeAssignor.java            |  38 +++-
 .../runtime/distributed/WorkerCoordinator.java     |  13 ++
 .../integration/ConnectIntegrationTestUtils.java   |  42 ++++
 .../integration/ConnectWorkerIntegrationTest.java  |   5 +
 .../integration/ExampleConnectIntegrationTest.java |   5 +
 .../RebalanceSourceConnectorsIntegrationTest.java  |  13 +-
 .../IncrementalCooperativeAssignorTest.java        | 227 ++++++++++++++++-----
 10 files changed, 282 insertions(+), 73 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e270df6..39c6d4b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -103,12 +103,10 @@
     <suppress checks="ClassFanOutComplexity"
               files="Worker.java"/>
     <suppress checks="MethodLength"
-              files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
+              files="(KafkaConfigBackingStore|IncrementalCooperativeAssignor|RequestResponseTest|WorkerSinkTaskTest).java"/>
 
     <suppress checks="ParameterNumber"
-              files="(WorkerSinkTask|WorkerSourceTask).java"/>
-    <suppress checks="ParameterNumber"
-              files="WorkerCoordinator.java"/>
+              files="Worker(SinkTask|SourceTask|Coordinator).java"/>
     <suppress checks="ParameterNumber"
               files="ConfigKeyInfo.java"/>
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 58071ce..e3eeb19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -426,8 +426,8 @@ public abstract class AbstractCoordinator implements Closeable {
 
                 // Generation data maybe concurrently cleared by Heartbeat thread.
                 // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
-                // and  shouldn't block hearbeat thread.
-                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
+                // and shouldn't block heartbeat thread.
+                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment}
                 synchronized (AbstractCoordinator.this) {
                     generationSnapshot = this.generation;
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index c389925..00f6663 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -146,7 +146,7 @@ public class DistributedConfig extends WorkerConfig {
     public static final String CONNECT_PROTOCOL_DEFAULT = ConnectProtocolCompatibility.SESSIONED.toString();
 
     /**
-     * <code>connect.protocol</code>
+     * <code>scheduled.rebalance.max.delay.ms</code>
      */
     public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG = "scheduled.rebalance.max.delay.ms";
     public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC = "The maximum delay that is "
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index c6d7cc6..d4e9d87 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -66,6 +66,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     protected final Set<String> candidateWorkersForReassignment;
     protected long scheduledRebalance;
     protected int delay;
+    protected int previousGenerationId;
+    protected Set<String> previousMembers;
 
     public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
         this.log = logContext.logger(IncrementalCooperativeAssignor.class);
@@ -77,6 +79,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         this.scheduledRebalance = 0;
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
+        this.previousGenerationId = -1;
+        this.previousMembers = Collections.emptySet();
     }
 
     @Override
@@ -159,6 +163,17 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Clearing the view of previous assignments due to generation mismatch between "
+                    + "previous generation ID {} and last completed generation ID {}. This can "
+                    + "happen if the leader fails to sync the assignment within a rebalancing round. "
+                    + "The following view of previous assignments might be outdated and will be "
+                    + "ignored by the leader in the current computation of new assignments. "
+                    + "Possibly outdated previous assignments: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);
+            this.previousAssignment = ConnectorsAndTasks.EMPTY;
+        }
 
         ClusterConfigState snapshot = coordinator.configSnapshot();
         Set<String> configuredConnectors = new TreeSet<>(snapshot.connectors());
@@ -236,7 +251,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         taskAssignments =
                 completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
 
-        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
+        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs);
 
         // Do not revoke resources for re-assignment while a delayed rebalance is active
         // Also we do not revoke in two consecutive rebalances by the same leader
@@ -267,19 +282,15 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
 
         assignConnectors(completeWorkerAssignment, newSubmissions.connectors());
         assignTasks(completeWorkerAssignment, newSubmissions.tasks());
-
         log.debug("Current complete assignments: {}", currentWorkerAssignment);
         log.debug("New complete assignments: {}", completeWorkerAssignment);
 
         Map<String, Collection<String>> currentConnectorAssignments =
                 currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
-
         Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
                 currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
-
         Map<String, Collection<String>> incrementalConnectorAssignments =
                 diff(connectorAssignments, currentConnectorAssignments);
-
         Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
                 diff(taskAssignments, currentTaskAssignments);
 
@@ -292,9 +303,9 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
                 fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
                                 memberConfigs.get(leaderId).url(), maxOffset, incrementalConnectorAssignments,
                                 incrementalTaskAssignments, toRevoke, delay, protocolVersion);
-
         previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
-
+        previousGenerationId = coordinator.generationId();
+        previousMembers = memberConfigs.keySet();
         log.debug("Actual assignments: {}", assignments);
         return serializeAssignments(assignments);
     }
@@ -351,7 +362,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     // visible for testing
     protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                                          ConnectorsAndTasks newSubmissions,
-                                         List<WorkerLoad> completeWorkerAssignment) {
+                                         List<WorkerLoad> completeWorkerAssignment,
+                                         Map<String, ExtendedWorkerState> memberConfigs) {
         if (lostAssignments.isEmpty()) {
             resetDelay();
             return;
@@ -361,6 +373,16 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (scheduledRebalance <= 0 && memberConfigs.keySet().containsAll(previousMembers)) {
+            log.debug("No worker seems to have departed the group during the rebalance. The "
+                    + "missing assignments that the leader is detecting are probably due to some "
+                    + "workers failing to receive the new assignments in the previous rebalance. "
+                    + "Will reassign missing tasks as new tasks");
+            newSubmissions.connectors().addAll(lostAssignments.connectors());
+            newSubmissions.tasks().addAll(lostAssignments.tasks());
+            return;
+        }
+
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 71f351d..17d6690 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -64,6 +64,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
 
     private boolean rejoinRequested;
     private volatile ConnectProtocolCompatibility currentConnectProtocol;
+    private volatile int lastCompletedGenerationId;
     private final ConnectAssignor eagerAssignor;
     private final ConnectAssignor incrementalAssignor;
     private final int coordinatorDiscoveryTimeoutMs;
@@ -100,6 +101,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
         this.eagerAssignor = new EagerAssignor(logContext);
         this.currentConnectProtocol = protocolCompatibility;
         this.coordinatorDiscoveryTimeoutMs = config.heartbeatIntervalMs;
+        this.lastCompletedGenerationId = Generation.NO_GENERATION.generationId;
     }
 
     @Override
@@ -207,6 +209,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
             log.debug("Augmented new assignment: {}", newAssignment);
         }
         assignmentSnapshot = newAssignment;
+        lastCompletedGenerationId = generation;
         listener.onAssigned(newAssignment, generation);
     }
 
@@ -256,6 +259,16 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
         return super.generation().generationId;
     }
 
+    /**
+     * Return id that corresponds to the group generation that was active when the last join was successful
+     *
+     * @return the generation ID of the last group that was joined successfully by this member or -1 if no generation
+     * was stable at that point
+     */
+    public int lastCompletedGenerationId() {
+        return lastCompletedGenerationId;
+    }
+
     private boolean isLeader() {
         final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
         return localAssignmentSnapshot != null && memberId().equals(localAssignmentSnapshot.leader());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
new file mode 100644
index 0000000..1aeaa07
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+
+/**
+ */
+public class ConnectIntegrationTestUtils {
+    public static TestRule newTestWatcher(Logger log) {
+        return new TestWatcher() {
+            @Override
+            protected void starting(Description description) {
+                super.starting(description);
+                log.info("Starting test {}", description.getMethodName());
+            }
+
+            @Override
+            protected void finished(Description description) {
+                super.finished(description);
+                log.info("Finished test {}", description.getMethodName());
+            }
+        };
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index a0f672e..99c8c4c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -23,8 +23,10 @@ import org.apache.kafka.connect.util.clusters.WorkerHandle;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +67,9 @@ public class ConnectWorkerIntegrationTest {
     Map<String, String> workerProps = new HashMap<>();
     Properties brokerProps = new Properties();
 
+    @Rule
+    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
+
     @Before
     public void setup() {
         // setup Connect worker properties
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index f4cba87..5d36486 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -22,8 +22,10 @@ import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +66,9 @@ public class ExampleConnectIntegrationTest {
     private EmbeddedConnectCluster connect;
     private ConnectorHandle connectorHandle;
 
+    @Rule
+    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
+
     @Before
     public void setup() {
         // setup Connect worker properties
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 19e7863..be8ce61 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -22,9 +22,10 @@ import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_C
 import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -68,12 +70,16 @@ public class RebalanceSourceConnectorsIntegrationTest {
 
     private EmbeddedConnectCluster connect;
 
+    @Rule
+    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
+
     @Before
     public void setup() {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString());
-        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "30000");
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.SECONDS.toMillis(30)));
+        workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(TimeUnit.SECONDS.toMillis(30)));
 
         // setup Kafka broker properties
         Properties brokerProps = new Properties();
@@ -98,7 +104,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
         connect.stop();
     }
 
-    @Ignore("Flaky and disruptive. See KAFKA-8391, KAFKA-8661 for details.")
     @Test
     public void testStartTwoConnectors() throws Exception {
         // create test topic
@@ -133,7 +138,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
                 "Connector tasks did not start in time.");
     }
 
-    @Ignore("Flaky and disruptive. See KAFKA-8391, KAFKA-8661 for details.")
     @Test
     public void testReconfigConnector() throws Exception {
         ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
@@ -192,7 +196,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
                 recordNum >= numRecordsProduced);
     }
 
-    @Ignore("Flaky and disruptive. See KAFKA-8391, KAFKA-8661 for details.")
     @Test
     public void testDeleteConnector() throws Exception {
         // create test topic
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index a5e4ef0..3e72ce9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -118,6 +118,7 @@ public class IncrementalCooperativeAssignorTest {
                 new LogContext(),
                 time,
                 rebalanceDelay));
+        assignor.previousGenerationId = 1000;
     }
 
     @Test
@@ -126,6 +127,7 @@ public class IncrementalCooperativeAssignorTest {
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
         // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -138,6 +140,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -149,6 +152,7 @@ public class IncrementalCooperativeAssignorTest {
         // Third assignment after revocations
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -160,6 +164,7 @@ public class IncrementalCooperativeAssignorTest {
         // A fourth rebalance should not change assignments
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -170,8 +175,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -185,6 +191,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -199,6 +206,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         assignments.remove("worker2");
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -213,6 +221,7 @@ public class IncrementalCooperativeAssignorTest {
         // been reached yet
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -226,6 +235,7 @@ public class IncrementalCooperativeAssignorTest {
         // Fourth assignment after delay expired
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -236,8 +246,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -251,6 +262,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -265,6 +277,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         assignments.remove("worker2");
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -279,6 +292,7 @@ public class IncrementalCooperativeAssignorTest {
         // been reached yet
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -294,6 +308,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -308,6 +323,7 @@ public class IncrementalCooperativeAssignorTest {
         // assignments ought to be assigned to the worker that has appeared as returned.
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -318,8 +334,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -334,6 +351,7 @@ public class IncrementalCooperativeAssignorTest {
         // First assignment with 3 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
         memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -355,6 +373,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // Capture needs to be reset to point to the new assignor
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -366,6 +385,7 @@ public class IncrementalCooperativeAssignorTest {
         // Third (incidental) assignment with still only one worker in the group.
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -375,8 +395,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -391,6 +412,7 @@ public class IncrementalCooperativeAssignorTest {
         // First assignment with 3 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
         memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -412,6 +434,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // Capture needs to be reset to point to the new assignor
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -426,6 +449,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
         memberConfigs.put("worker1", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -436,6 +460,7 @@ public class IncrementalCooperativeAssignorTest {
         // Fourth assignment after revocations
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -446,8 +471,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -463,6 +489,7 @@ public class IncrementalCooperativeAssignorTest {
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
         try {
+            expectGeneration();
             assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         } catch (RuntimeException e) {
             RequestFuture.failure(e);
@@ -470,68 +497,106 @@ public class IncrementalCooperativeAssignorTest {
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        // This was the assignment that should have been sent, but didn't make it after all the way
+        // This was the assignment that should have been sent, but didn't make it all the way
         assertDelay(0, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
         // Second assignment happens with members returning the same assignments (memberConfigs)
-        // as the first time. The assignor can not tell whether it was the assignment that failed
-        // or the workers that were bounced. Therefore it goes into assignment freeze for
-        // the new assignments for a rebalance delay period
+        // as the first time. The assignor detects that the number of members did not change and
+        // avoids the rebalance delay, treating the lost assignments as new assignments.
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertDelay(rebalanceDelay, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
-        time.sleep(rebalanceDelay / 2);
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
+        verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
+    }
 
-        // Third (incidental) assignment with still only one worker in the group. Max delay has not
-        // been reached yet
-        applyAssignments(returnedAssignments);
-        memberConfigs = memberConfigs(leader, offset, assignments);
+    @Test
+    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
-        assertDelay(rebalanceDelay / 2, returnedAssignments);
+        assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
-        time.sleep(rebalanceDelay / 2 + 1);
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
+                .when(assignor).serializeAssignments(assignmentsCapture.capture());
 
-        // Fourth assignment after delay expired. Finally all the new assignments are assigned
+        // Second assignment triggered by a third worker joining. The computed assignment should
+        // revoke tasks from the existing group. But the assignment won't be correctly delivered.
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
-        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
+        try {
+            expectGeneration();
+            assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
+        } catch (RuntimeException e) {
+            RequestFuture.failure(e);
+        }
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        // This was the assignment that should have been sent, but didn't make it all the way
         assertDelay(0, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
+
+        // Third assignment happens with members returning the same assignments (memberConfigs)
+        // as the first time.
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertDelay(0, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
-    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() {
+    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFailsOutsideTheAssignor() {
         // Customize assignor for this test case
         time = new MockTime();
         initAssignor();
 
+        expectGeneration();
         when(coordinator.configSnapshot()).thenReturn(configState);
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -540,31 +605,30 @@ public class IncrementalCooperativeAssignorTest {
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
-        when(coordinator.configSnapshot()).thenReturn(configState);
-        doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
-                .when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // Second assignment triggered by a third worker joining. The computed assignment should
-        // revoke tasks from the existing group. But the assignment won't be correctly delivered.
+        // revoke tasks from the existing group. But the assignment won't be correctly delivered
+        // and sync group with fail on the leader worker.
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
         memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
-        try {
-            assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
-        } catch (RuntimeException e) {
-            RequestFuture.failure(e);
-        }
+        when(coordinator.generationId())
+                .thenReturn(assignor.previousGenerationId + 1)
+                .thenReturn(assignor.previousGenerationId + 1);
+        when(coordinator.lastCompletedGenerationId()).thenReturn(assignor.previousGenerationId - 1);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        // This was the assignment that should have been sent, but didn't make it after all the way
+        // This was the assignment that should have been sent, but didn't make it all the way
         assertDelay(0, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
 
         // Third assignment happens with members returning the same assignments (memberConfigs)
         // as the first time.
+        when(coordinator.lastCompletedGenerationId()).thenReturn(assignor.previousGenerationId - 1);
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -575,8 +639,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -587,6 +652,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // First assignment with 1 worker and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -600,6 +666,7 @@ public class IncrementalCooperativeAssignorTest {
         when(coordinator.configSnapshot()).thenReturn(configState);
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -610,8 +677,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -689,14 +757,17 @@ public class IncrementalCooperativeAssignorTest {
 
         Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+        memberConfigs = memberConfigs(leader, offset, 0, 2);
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()));
+                new ArrayList<>(configuredAssignment.values()),
+                memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -704,14 +775,17 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         String flakyWorker = "worker1";
         WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
+        memberConfigs.remove(flakyWorker);
 
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -719,12 +793,15 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // A new worker (probably returning worker) has joined
         configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.put(flakyWorker, new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.singleton(flakyWorker),
@@ -732,10 +809,12 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay);
 
         // The new worker has still no assignments
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertTrue("Wrong assignment of lost connectors",
                 configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
@@ -764,14 +843,17 @@ public class IncrementalCooperativeAssignorTest {
 
         Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+        memberConfigs = memberConfigs(leader, offset, 0, 2);
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()));
+                new ArrayList<>(configuredAssignment.values()),
+                memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -779,14 +861,17 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         String removedWorker = "worker1";
         WorkerLoad lostLoad = workerLoad(removedWorker, 2, 2, 4, 4);
+        memberConfigs.remove(removedWorker);
 
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -794,11 +879,13 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // No new worker has joined
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -808,7 +895,8 @@ public class IncrementalCooperativeAssignorTest {
 
         time.sleep(rebalanceDelay);
 
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertTrue("Wrong assignment of lost connectors",
                 newSubmissions.connectors().containsAll(lostAssignments.connectors()));
@@ -833,14 +921,17 @@ public class IncrementalCooperativeAssignorTest {
 
         Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+        memberConfigs = memberConfigs(leader, offset, 0, 2);
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()));
+                new ArrayList<>(configuredAssignment.values()),
+                memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -848,8 +939,10 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         String flakyWorker = "worker1";
         WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
+        memberConfigs.remove(flakyWorker);
         String newWorker = "worker3";
 
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
@@ -857,7 +950,9 @@ public class IncrementalCooperativeAssignorTest {
 
         // Lost assignments detected - A new worker also has joined that is not the returning worker
         configuredAssignment.put(newWorker, new WorkerLoad.Builder(newWorker).build());
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.put(newWorker, new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.singleton(newWorker),
@@ -865,12 +960,15 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // Now two new workers have joined
         configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.put(flakyWorker, new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         Set<String> expectedWorkers = new HashSet<>();
         expectedWorkers.addAll(Arrays.asList(newWorker, flakyWorker));
@@ -880,12 +978,16 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay);
 
         // The new workers have new assignments, other than the lost ones
         configuredAssignment.put(flakyWorker, workerLoad(flakyWorker, 6, 2, 8, 4));
         configuredAssignment.put(newWorker, workerLoad(newWorker, 8, 2, 12, 4));
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        // we don't reflect these new assignments in memberConfigs currently because they are not
+        // used in handleLostAssignments method
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         // newWorker joined first, so should be picked up first as a candidate for reassignment
         assertTrue("Wrong assignment of lost connectors",
@@ -915,14 +1017,17 @@ public class IncrementalCooperativeAssignorTest {
 
         Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+        memberConfigs = memberConfigs(leader, offset, 0, 2);
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()));
+                new ArrayList<>(configuredAssignment.values()),
+                memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -930,14 +1035,17 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         String veryFlakyWorker = "worker1";
         WorkerLoad lostLoad = workerLoad(veryFlakyWorker, 2, 2, 4, 4);
+        memberConfigs.remove(veryFlakyWorker);
 
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -945,12 +1053,15 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // A new worker (probably returning worker) has joined
         configuredAssignment.put(veryFlakyWorker, new WorkerLoad.Builder(veryFlakyWorker).build());
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.put(veryFlakyWorker, new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.singleton(veryFlakyWorker),
@@ -958,11 +1069,14 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay);
 
         // The returning worker leaves permanently after joining briefly during the delay
         configuredAssignment.remove(veryFlakyWorker);
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.remove(veryFlakyWorker);
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertTrue("Wrong assignment of lost connectors",
                 newSubmissions.connectors().containsAll(lostAssignments.connectors()));
@@ -1185,4 +1299,11 @@ public class IncrementalCooperativeAssignorTest {
                 Collections.emptyList(),
                 is(existingTasks));
     }
+
+    private void expectGeneration() {
+        when(coordinator.generationId())
+                .thenReturn(assignor.previousGenerationId + 1)
+                .thenReturn(assignor.previousGenerationId + 1);
+        when(coordinator.lastCompletedGenerationId()).thenReturn(assignor.previousGenerationId);
+    }
 }


Mime
View raw message