kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827)
Date Tue, 09 Jun 2020 20:03:16 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b2d37cc  KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers
when incremental cooperative rebalancing is used (#8827)
b2d37cc is described below

commit b2d37cc74e9b694713a561d307f52cc0ddf08fe6
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Tue Jun 9 13:02:35 2020 -0700

    KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental
cooperative rebalancing is used (#8827)
    
    When Incremental Cooperative Rebalancing is enabled and a worker fails to read to the
end of the config topic, it needs to voluntarily revoke its locally running tasks on time,
before these tasks get assigned to another worker, creating a situation where redundant tasks
are running in the Connect cluster.
    
    Additionally, instead of using the delay `worker.unsync.backoff.ms` that was defined for
the eager rebalancing protocol and has a long default value (which coincidentally is equal
to the default value of the rebalance delay of the incremental cooperative protocol), the
worker should quickly attempt to re-read the config topic and backoff for a fraction of the
rebalance delay. After this fix, the worker will retry for a maximum time of 5 times before
it revokes its running assignment a [...]
    
    Unit tests are added to cover the backoff logic with incremental cooperative rebalancing.
    
    Reviewers: Randall Hauch <rhauch@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     |  31 ++-
 .../runtime/distributed/ExtendedAssignment.java    |  15 ++
 .../runtime/distributed/WorkerCoordinator.java     |   4 +
 .../runtime/distributed/WorkerGroupMember.java     |   4 +
 .../integration/ConnectIntegrationTestUtils.java   |   1 +
 .../runtime/distributed/DistributedHerderTest.java | 207 +++++++++++++++++++--
 6 files changed, 248 insertions(+), 14 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 4cc7539..7762f9f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -26,8 +26,8 @@ import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -61,7 +61,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.SinkUtils;
-import org.apache.kafka.common.utils.ThreadUtils;
 import org.slf4j.Logger;
 
 import javax.crypto.KeyGenerator;
@@ -93,6 +92,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 
 /**
@@ -132,6 +132,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
     private static final int START_STOP_THREAD_POOL_SIZE = 8;
+    private static final short BACKOFF_RETRIES = 5;
 
     private final AtomicLong requestSeqNum = new AtomicLong();
 
@@ -180,6 +181,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     private SecretKey sessionKey;
     private volatile long keyExpiration;
     private short currentProtocolVersion;
+    private short backoffRetries;
 
     private final DistributedConfig config;
 
@@ -254,6 +256,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         scheduledRebalance = Long.MAX_VALUE;
         keyExpiration = Long.MAX_VALUE;
         sessionKey = null;
+        backoffRetries = BACKOFF_RETRIES;
 
         currentProtocolVersion = ConnectProtocolCompatibility.compatibility(
             config.getString(DistributedConfig.CONNECT_PROTOCOL_CONFIG)
@@ -1084,6 +1087,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
             configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
             configState = configBackingStore.snapshot();
             log.info("Finished reading to end of log and updated config snapshot, new config
log offset: {}", configState.offset());
+            backoffRetries = BACKOFF_RETRIES;
             return true;
         } catch (TimeoutException e) {
             // in case reading the log takes too long, leave the group to ensure a quick
rebalance (although by default we should be out of the group already)
@@ -1096,7 +1100,28 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     }
 
     private void backoff(long ms) {
-        Utils.sleep(ms);
+        if (ConnectProtocolCompatibility.fromProtocolVersion(currentProtocolVersion) == EAGER)
{
+            time.sleep(ms);
+            return;
+        }
+
+        if (backoffRetries > 0) {
+            int rebalanceDelayFraction =
+                    config.getInt(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG)
/ 10 / backoffRetries;
+            time.sleep(rebalanceDelayFraction);
+            --backoffRetries;
+            return;
+        }
+
+        ExtendedAssignment runningAssignmentSnapshot;
+        synchronized (this) {
+            runningAssignmentSnapshot = ExtendedAssignment.duplicate(runningAssignment);
+        }
+        log.info("Revoking current running assignment {} because after {} retries the worker
"
+                + "has not caught up with the latest Connect cluster updates",
+                runningAssignmentSnapshot, BACKOFF_RETRIES);
+        member.revokeAssignment(runningAssignmentSnapshot);
+        backoffRetries = BACKOFF_RETRIES;
     }
 
     private void startAndStop(Collection<Callable<Void>> callables) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
index 7518e06..e544407 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -85,6 +86,20 @@ public class ExtendedAssignment extends ConnectProtocol.Assignment {
         this.delay = delay;
     }
 
+    public static ExtendedAssignment duplicate(ExtendedAssignment assignment) {
+        return new ExtendedAssignment(
+                assignment.version(),
+                assignment.error(),
+                assignment.leader(),
+                assignment.leaderUrl(),
+                assignment.offset(),
+                new LinkedHashSet<>(assignment.connectors()),
+                new LinkedHashSet<>(assignment.tasks()),
+                new LinkedHashSet<>(assignment.revokedConnectors()),
+                new LinkedHashSet<>(assignment.revokedTasks()),
+                assignment.delay());
+    }
+
     /**
      * Return the version of the connect protocol that this assignment belongs to.
      *
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 17d6690..800c1d3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -269,6 +269,10 @@ public class WorkerCoordinator extends AbstractCoordinator implements
Closeable
         return lastCompletedGenerationId;
     }
 
+    public void revokeAssignment(ExtendedAssignment assignment) {
+        listener.onRevoked(assignment.leader(), assignment.connectors(), assignment.tasks());
+    }
+
     private boolean isLeader() {
         final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
         return localAssignmentSnapshot != null && memberId().equals(localAssignmentSnapshot.leader());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 40f2b07..f9bc9d8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -222,6 +222,10 @@ public class WorkerGroupMember {
         return coordinator.currentProtocolVersion();
     }
 
+    public void revokeAssignment(ExtendedAssignment assignment) {
+        coordinator.revokeAssignment(assignment);
+    }
+
     private void stop(boolean swallowException) {
         log.trace("Stopping the Connect group member.");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
index 1aeaa07..058dbe2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
@@ -22,6 +22,7 @@ import org.junit.runner.Description;
 import org.slf4j.Logger;
 
 /**
+ * A utility class for Connect's integration tests
  */
 public class ConnectIntegrationTestUtils {
     public static TestRule newTestWatcher(Logger log) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 879abf8..e0b2b3a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -88,6 +88,8 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.newCapture;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -208,7 +210,7 @@ public class DistributedHerderTest {
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
 
         herder = PowerMock.createPartialMock(DistributedHerder.class,
-                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus",
"updateDeletedTaskStatus"},
+                new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time,
noneConnectorClientConfigOverridePolicy);
 
@@ -536,7 +538,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -583,7 +585,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -597,7 +599,7 @@ public class DistributedHerderTest {
 
         // CONN2 creation should fail
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
 
@@ -632,7 +634,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -649,7 +651,7 @@ public class DistributedHerderTest {
 
         // CONN2 creation should fail
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
 
@@ -688,7 +690,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SinkConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -700,7 +702,7 @@ public class DistributedHerderTest {
         // CONN2 creation should fail because the worker group id (connect-test-group) conflicts
with
         // the consumer group id we would use for this sink
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.isNull(Herder.Created.class));
         PowerMock.expectLastCall();
 
@@ -727,7 +729,7 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null);
@@ -1482,7 +1484,6 @@ public class DistributedHerderTest {
         EasyMock.expectLastCall().andThrow(new TimeoutException());
         member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log"));
         EasyMock.expectLastCall();
-        PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
         member.requestRejoin();
 
         // After backoff, restart the process and this time succeed
@@ -1504,14 +1505,198 @@ public class DistributedHerderTest {
 
         PowerMock.replayAll();
 
+        long before = time.milliseconds();
+        int workerUnsyncBackoffMs = DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT;
+        int coordinatorDiscoveryTimeoutMs = 100;
         herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs, time.milliseconds());
+
         time.sleep(1000L);
         assertStatistics("leaderUrl", true, 3, 0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY);
 
+        before = time.milliseconds();
         herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds());
         time.sleep(2000L);
         assertStatistics("leaderUrl", false, 3, 1, 100, 2000L);
 
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCatchUpRetriesForIncrementalCooperative() throws Exception
{
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+
+        // Join group and as leader fail to do assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // The leader got its assignment
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        // and the new assignment started
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Another rebalance is triggered but this time it fails to read to the max offset
and
+        // triggers a re-sync
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+                Collections.<ConnectorTaskId>emptyList());
+
+        // The leader will retry a few times to read to the end of the config log
+        int retries = 2;
+        member.requestRejoin();
+        for (int i = retries; i >= 0; --i) {
+            // Reading to end of log times out
+            configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+            EasyMock.expectLastCall().andThrow(new TimeoutException());
+            member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log"));
+            EasyMock.expectLastCall();
+        }
+
+        // After a few retries succeed to read the log to the end
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
+        herder.tick();
+
+        long before;
+        int coordinatorDiscoveryTimeoutMs = 100;
+        int maxRetries = 5;
+        for (int i = maxRetries; i >= maxRetries - retries; --i) {
+            before = time.milliseconds();
+            int workerUnsyncBackoffMs =
+                    DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10 / i;
+            herder.tick();
+            assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs,
time.milliseconds());
+            coordinatorDiscoveryTimeoutMs = 0;
+        }
+
+        before = time.milliseconds();
+        coordinatorDiscoveryTimeoutMs = 100;
+        herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCatchUpFailsForIncrementalCooperative() throws Exception {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+
+        // Join group and as leader fail to do assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // The leader got its assignment
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        // and the new assignment started
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Another rebalance is triggered but this time it fails to read to the max offset
and
+        // triggers a re-sync
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+                Collections.<ConnectorTaskId>emptyList());
+
+        // The leader will exhaust the retries while trying to read to the end of the config
log
+        int maxRetries = 5;
+        member.requestRejoin();
+        for (int i = maxRetries; i >= 0; --i) {
+            // Reading to end of log times out
+            configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+            EasyMock.expectLastCall().andThrow(new TimeoutException());
+            member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log"));
+            EasyMock.expectLastCall();
+        }
+
+        Capture<ExtendedAssignment> assignmentCapture = newCapture();
+        member.revokeAssignment(capture(assignmentCapture));
+        PowerMock.expectLastCall();
+
+        // After a complete backoff and a revocation of running tasks rejoin and this time
succeed
+        // The worker gets back the assignment that had given up
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
+        herder.tick();
+
+        long before;
+        int coordinatorDiscoveryTimeoutMs = 100;
+        for (int i = maxRetries; i > 0; --i) {
+            before = time.milliseconds();
+            int workerUnsyncBackoffMs =
+                    DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10 / i;
+            herder.tick();
+            assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs,
time.milliseconds());
+            coordinatorDiscoveryTimeoutMs = 0;
+        }
+
+        before = time.milliseconds();
+        herder.tick();
+        assertEquals(before, time.milliseconds());
+        assertEquals(Collections.singleton(CONN1), assignmentCapture.getValue().connectors());
+        assertEquals(Collections.singleton(TASK1), assignmentCapture.getValue().tasks());
+        herder.tick();
 
         PowerMock.verifyAll();
     }
@@ -1598,7 +1783,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);


Mime
View raw message