kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] 02/02: KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827)
Date Wed, 10 Jun 2020 07:15:01 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 71c877ff9f71106478fd7a2a343e7a53f9f25962
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Tue Jun 9 13:02:35 2020 -0700

    KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental
cooperative rebalancing is used (#8827)
    
    When Incremental Cooperative Rebalancing is enabled and a worker fails to read to the
end of the config topic, it needs to voluntarily revoke its locally running tasks on time,
before these tasks get assigned to another worker, creating a situation where redundant tasks
are running in the Connect cluster.
    
    Additionally, instead of using the delay `worker.unsync.backoff.ms` that was defined for
the eager rebalancing protocol and has a long default value (which coincidentally is equal
to the default value of the rebalance delay of the incremental cooperative protocol), the
worker should quickly attempt to re-read the config topic and backoff for a fraction of the
rebalance delay. After this fix, the worker will retry for a maximum time of 5 times before
it revokes its running assignment a [...]
    
    Unit tests are added to cover the backoff logic with incremental cooperative rebalancing.
    
    Reviewers: Randall Hauch <rhauch@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     |  28 ++-
 .../runtime/distributed/ExtendedAssignment.java    |  15 ++
 .../runtime/distributed/WorkerCoordinator.java     |   4 +
 .../runtime/distributed/WorkerGroupMember.java     |   4 +
 .../integration/ConnectIntegrationTestUtils.java   |   1 +
 .../runtime/distributed/DistributedHerderTest.java | 207 +++++++++++++++++++--
 6 files changed, 246 insertions(+), 13 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 2728f5d..1a1d3fe 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -123,6 +122,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
     private static final int START_STOP_THREAD_POOL_SIZE = 8;
+    private static final short BACKOFF_RETRIES = 5;
 
     private final AtomicLong requestSeqNum = new AtomicLong();
 
@@ -162,6 +162,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     private boolean needsReconfigRebalance;
     private volatile int generation;
     private volatile long scheduledRebalance;
+    private short backoffRetries;
 
     private final DistributedConfig config;
 
@@ -226,6 +227,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         needsReconfigRebalance = false;
         canReadConfigs = true; // We didn't try yet, but Configs are readable until proven
otherwise
         scheduledRebalance = Long.MAX_VALUE;
+        backoffRetries = BACKOFF_RETRIES;
     }
 
     @Override
@@ -973,6 +975,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
             configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
             configState = configBackingStore.snapshot();
             log.info("Finished reading to end of log and updated config snapshot, new config
log offset: {}", configState.offset());
+            backoffRetries = BACKOFF_RETRIES;
             return true;
         } catch (TimeoutException e) {
             // in case reading the log takes too long, leave the group to ensure a quick
rebalance (although by default we should be out of the group already)
@@ -985,7 +988,28 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     }
 
     private void backoff(long ms) {
-        Utils.sleep(ms);
+        if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
+            time.sleep(ms);
+            return;
+        }
+
+        if (backoffRetries > 0) {
+            int rebalanceDelayFraction =
+                    config.getInt(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG)
/ 10 / backoffRetries;
+            time.sleep(rebalanceDelayFraction);
+            --backoffRetries;
+            return;
+        }
+
+        ExtendedAssignment runningAssignmentSnapshot;
+        synchronized (this) {
+            runningAssignmentSnapshot = ExtendedAssignment.duplicate(runningAssignment);
+        }
+        log.info("Revoking current running assignment {} because after {} retries the worker
"
+                + "has not caught up with the latest Connect cluster updates",
+                runningAssignmentSnapshot, BACKOFF_RETRIES);
+        member.revokeAssignment(runningAssignmentSnapshot);
+        backoffRetries = BACKOFF_RETRIES;
     }
 
     private void startAndStop(Collection<Callable<Void>> callables) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
index 7518e06..e544407 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -85,6 +86,20 @@ public class ExtendedAssignment extends ConnectProtocol.Assignment {
         this.delay = delay;
     }
 
+    public static ExtendedAssignment duplicate(ExtendedAssignment assignment) {
+        return new ExtendedAssignment(
+                assignment.version(),
+                assignment.error(),
+                assignment.leader(),
+                assignment.leaderUrl(),
+                assignment.offset(),
+                new LinkedHashSet<>(assignment.connectors()),
+                new LinkedHashSet<>(assignment.tasks()),
+                new LinkedHashSet<>(assignment.revokedConnectors()),
+                new LinkedHashSet<>(assignment.revokedTasks()),
+                assignment.delay());
+    }
+
     /**
      * Return the version of the connect protocol that this assignment belongs to.
      *
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 644f591..2c5ac18 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -276,6 +276,10 @@ public class WorkerCoordinator extends AbstractCoordinator implements
Closeable
         return lastCompletedGenerationId;
     }
 
+    public void revokeAssignment(ExtendedAssignment assignment) {
+        listener.onRevoked(assignment.leader(), assignment.connectors(), assignment.tasks());
+    }
+
     private boolean isLeader() {
         final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
         return localAssignmentSnapshot != null && memberId().equals(localAssignmentSnapshot.leader());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index ed2015d..d37d625 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -213,6 +213,10 @@ public class WorkerGroupMember {
         return coordinator.currentProtocolVersion();
     }
 
+    public void revokeAssignment(ExtendedAssignment assignment) {
+        coordinator.revokeAssignment(assignment);
+    }
+
     private void stop(boolean swallowException) {
         log.trace("Stopping the Connect group member.");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
index 1aeaa07..058dbe2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
@@ -22,6 +22,7 @@ import org.junit.runner.Description;
 import org.slf4j.Logger;
 
 /**
+ * A utility class for Connect's integration tests
  */
 public class ConnectIntegrationTestUtils {
     public static TestRule newTestWatcher(Logger log) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index af8caec..79d6c8a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -80,6 +80,8 @@ import java.util.concurrent.TimeoutException;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.newCapture;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -193,7 +195,7 @@ public class DistributedHerderTest {
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
 
         herder = PowerMock.createPartialMock(DistributedHerder.class,
-                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus",
"updateDeletedTaskStatus"},
+                new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time,
noneConnectorClientConfigOverridePolicy);
 
@@ -521,7 +523,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -568,7 +570,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -582,7 +584,7 @@ public class DistributedHerderTest {
 
         // CONN2 creation should fail
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
 
@@ -617,7 +619,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -634,7 +636,7 @@ public class DistributedHerderTest {
 
         // CONN2 creation should fail
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
 
@@ -673,7 +675,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SinkConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -685,7 +687,7 @@ public class DistributedHerderTest {
         // CONN2 creation should fail because the worker group id (connect-test-group) conflicts
with
         // the consumer group id we would use for this sink
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.isNull(Herder.Created.class));
         PowerMock.expectLastCall();
 
@@ -712,7 +714,7 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null);
@@ -1445,7 +1447,6 @@ public class DistributedHerderTest {
         EasyMock.expectLastCall().andThrow(new TimeoutException());
         member.maybeLeaveGroup();
         EasyMock.expectLastCall();
-        PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
         member.requestRejoin();
 
         // After backoff, restart the process and this time succeed
@@ -1467,14 +1468,198 @@ public class DistributedHerderTest {
 
         PowerMock.replayAll();
 
+        long before = time.milliseconds();
+        int workerUnsyncBackoffMs = DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT;
+        int coordinatorDiscoveryTimeoutMs = 100;
         herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs, time.milliseconds());
+
         time.sleep(1000L);
         assertStatistics("leaderUrl", true, 3, 0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY);
 
+        before = time.milliseconds();
         herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds());
         time.sleep(2000L);
         assertStatistics("leaderUrl", false, 3, 1, 100, 2000L);
 
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCatchUpRetriesForIncrementalCooperative() throws Exception
{
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+
+        // Join group and as leader fail to do assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // The leader got its assignment
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        // and the new assignment started
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Another rebalance is triggered but this time it fails to read to the max offset
and
+        // triggers a re-sync
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+                Collections.<ConnectorTaskId>emptyList());
+
+        // The leader will retry a few times to read to the end of the config log
+        int retries = 2;
+        member.requestRejoin();
+        for (int i = retries; i >= 0; --i) {
+            // Reading to end of log times out
+            configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+            EasyMock.expectLastCall().andThrow(new TimeoutException());
+            member.maybeLeaveGroup();
+            EasyMock.expectLastCall();
+        }
+
+        // After a few retries succeed to read the log to the end
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
+        herder.tick();
+
+        long before;
+        int coordinatorDiscoveryTimeoutMs = 100;
+        int maxRetries = 5;
+        for (int i = maxRetries; i >= maxRetries - retries; --i) {
+            before = time.milliseconds();
+            int workerUnsyncBackoffMs =
+                    DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10 / i;
+            herder.tick();
+            assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs,
time.milliseconds());
+            coordinatorDiscoveryTimeoutMs = 0;
+        }
+
+        before = time.milliseconds();
+        coordinatorDiscoveryTimeoutMs = 100;
+        herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCatchUpFailsForIncrementalCooperative() throws Exception {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+
+        // Join group and as leader fail to do assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // The leader got its assignment
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        // and the new assignment started
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Another rebalance is triggered but this time it fails to read to the max offset
and
+        // triggers a re-sync
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+                Collections.<ConnectorTaskId>emptyList());
+
+        // The leader will exhaust the retries while trying to read to the end of the config
log
+        int maxRetries = 5;
+        member.requestRejoin();
+        for (int i = maxRetries; i >= 0; --i) {
+            // Reading to end of log times out
+            configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+            EasyMock.expectLastCall().andThrow(new TimeoutException());
+            member.maybeLeaveGroup();
+            EasyMock.expectLastCall();
+        }
+
+        Capture<ExtendedAssignment> assignmentCapture = newCapture();
+        member.revokeAssignment(capture(assignmentCapture));
+        PowerMock.expectLastCall();
+
+        // After a complete backoff and a revocation of running tasks rejoin and this time
succeed
+        // The worker gets back the assignment that had given up
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
+        herder.tick();
+
+        long before;
+        int coordinatorDiscoveryTimeoutMs = 100;
+        for (int i = maxRetries; i > 0; --i) {
+            before = time.milliseconds();
+            int workerUnsyncBackoffMs =
+                    DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10 / i;
+            herder.tick();
+            assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs,
time.milliseconds());
+            coordinatorDiscoveryTimeoutMs = 0;
+        }
+
+        before = time.milliseconds();
+        herder.tick();
+        assertEquals(before, time.milliseconds());
+        assertEquals(Collections.singleton(CONN1), assignmentCapture.getValue().connectors());
+        assertEquals(Collections.singleton(TASK1), assignmentCapture.getValue().tasks());
+        herder.tick();
 
         PowerMock.verifyAll();
     }
@@ -1561,7 +1746,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);


Mime
View raw message