From commits-return-14778-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Tue Jun 9 20:03:21 2020 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id AF29519770 for ; Tue, 9 Jun 2020 20:03:20 +0000 (UTC) Received: (qmail 18833 invoked by uid 500); 9 Jun 2020 20:03:19 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 18801 invoked by uid 500); 9 Jun 2020 20:03:19 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 18792 invoked by uid 99); 9 Jun 2020 20:03:19 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2020 20:03:19 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 9656C814A0; Tue, 9 Jun 2020 20:03:19 +0000 (UTC) Date: Tue, 09 Jun 2020 20:03:16 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159173299253.22960.10013934686319927298@gitbox.apache.org> From: kkarantasis@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 6fa44a3a14aadaa140fb99962823a0fe2cb55f52 X-Git-Newrev: b2d37cc74e9b694713a561d307f52cc0ddf08fe6 X-Git-Rev: b2d37cc74e9b694713a561d307f52cc0ddf08fe6 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kkarantasis pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b2d37cc KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827) b2d37cc is described below commit b2d37cc74e9b694713a561d307f52cc0ddf08fe6 Author: Konstantine Karantasis AuthorDate: Tue Jun 9 13:02:35 2020 -0700 KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827) When Incremental Cooperative Rebalancing is enabled and a worker fails to read to the end of the config topic, it needs to voluntarily revoke its locally running tasks on time, before these tasks get assigned to another worker, creating a situation where redundant tasks are running in the Connect cluster. Additionally, instead of using the delay `worker.unsync.backoff.ms` that was defined for the eager rebalancing protocol and has a long default value (which coincidentally is equal to the default value of the rebalance delay of the incremental cooperative protocol), the worker should quickly attempt to re-read the config topic and backoff for a fraction of the rebalance delay. After this fix, the worker will retry for a maximum time of 5 times before it revokes its running assignment a [...] Unit tests are added to cover the backoff logic with incremental cooperative rebalancing. Reviewers: Randall Hauch --- .../runtime/distributed/DistributedHerder.java | 31 ++- .../runtime/distributed/ExtendedAssignment.java | 15 ++ .../runtime/distributed/WorkerCoordinator.java | 4 + .../runtime/distributed/WorkerGroupMember.java | 4 + .../integration/ConnectIntegrationTestUtils.java | 1 + .../runtime/distributed/DistributedHerderTest.java | 207 +++++++++++++++++++-- 6 files changed, 248 insertions(+), 14 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 4cc7539..7762f9f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -26,8 +26,8 @@ import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; @@ -61,7 +61,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.SinkUtils; -import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import javax.crypto.KeyGenerator; @@ -93,6 +92,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; +import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; /** @@ -132,6 +132,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1); private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250; private static final int START_STOP_THREAD_POOL_SIZE = 8; + private static final short BACKOFF_RETRIES = 5; private final AtomicLong requestSeqNum = new AtomicLong(); @@ -180,6 +181,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private SecretKey sessionKey; private volatile long keyExpiration; private short currentProtocolVersion; + private short backoffRetries; private final DistributedConfig config; @@ -254,6 +256,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { scheduledRebalance = Long.MAX_VALUE; keyExpiration = Long.MAX_VALUE; sessionKey = null; + backoffRetries = BACKOFF_RETRIES; currentProtocolVersion = ConnectProtocolCompatibility.compatibility( config.getString(DistributedConfig.CONNECT_PROTOCOL_CONFIG) @@ -1084,6 +1087,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS); configState = configBackingStore.snapshot(); log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset()); + backoffRetries = BACKOFF_RETRIES; return true; } catch (TimeoutException e) { // in case reading the log takes too long, leave the group to ensure a quick rebalance (although by default we should be out of the group already) @@ -1096,7 +1100,28 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } private void backoff(long ms) { - Utils.sleep(ms); + if (ConnectProtocolCompatibility.fromProtocolVersion(currentProtocolVersion) == EAGER) { + time.sleep(ms); + return; + } + + if (backoffRetries > 0) { + int rebalanceDelayFraction = + config.getInt(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG) / 10 / backoffRetries; + time.sleep(rebalanceDelayFraction); + --backoffRetries; + return; + } + + ExtendedAssignment runningAssignmentSnapshot; + synchronized (this) { + runningAssignmentSnapshot = ExtendedAssignment.duplicate(runningAssignment); + } + log.info("Revoking current running assignment {} because after {} retries the worker " + + "has not caught up with the latest Connect cluster updates", + runningAssignmentSnapshot, BACKOFF_RETRIES); + member.revokeAssignment(runningAssignmentSnapshot); + backoffRetries = BACKOFF_RETRIES; } private void startAndStop(Collection> callables) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java index 7518e06..e544407 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -85,6 +86,20 @@ public class ExtendedAssignment extends ConnectProtocol.Assignment { this.delay = delay; } + public static ExtendedAssignment duplicate(ExtendedAssignment assignment) { + return new ExtendedAssignment( + assignment.version(), + assignment.error(), + assignment.leader(), + assignment.leaderUrl(), + assignment.offset(), + new LinkedHashSet<>(assignment.connectors()), + new LinkedHashSet<>(assignment.tasks()), + new LinkedHashSet<>(assignment.revokedConnectors()), + new LinkedHashSet<>(assignment.revokedTasks()), + assignment.delay()); + } + /** * Return the version of the connect protocol that this assignment belongs to. * diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 17d6690..800c1d3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -269,6 +269,10 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable return lastCompletedGenerationId; } + public void revokeAssignment(ExtendedAssignment assignment) { + listener.onRevoked(assignment.leader(), assignment.connectors(), assignment.tasks()); + } + private boolean isLeader() { final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot; return localAssignmentSnapshot != null && memberId().equals(localAssignmentSnapshot.leader()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 40f2b07..f9bc9d8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -222,6 +222,10 @@ public class WorkerGroupMember { return coordinator.currentProtocolVersion(); } + public void revokeAssignment(ExtendedAssignment assignment) { + coordinator.revokeAssignment(assignment); + } + private void stop(boolean swallowException) { log.trace("Stopping the Connect group member."); AtomicReference firstException = new AtomicReference<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java index 1aeaa07..058dbe2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java @@ -22,6 +22,7 @@ import org.junit.runner.Description; import org.slf4j.Logger; /** + * A utility class for Connect's integration tests */ public class ConnectIntegrationTestUtils { public static TestRule newTestWatcher(Logger log) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 879abf8..e0b2b3a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -88,6 +88,8 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.newCapture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -208,7 +210,7 @@ public class DistributedHerderTest { connectProtocolVersion = CONNECT_PROTOCOL_V0; herder = PowerMock.createPartialMock(DistributedHerder.class, - new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"}, + new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"}, new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy); @@ -536,7 +538,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture> configCapture = EasyMock.newCapture(); + final Capture> configCapture = newCapture(); EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); @@ -583,7 +585,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture> configCapture = EasyMock.newCapture(); + final Capture> configCapture = newCapture(); EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); @@ -597,7 +599,7 @@ public class DistributedHerderTest { // CONN2 creation should fail - Capture error = EasyMock.newCapture(); + Capture error = newCapture(); putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.>isNull()); PowerMock.expectLastCall(); @@ -632,7 +634,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture> configCapture = EasyMock.newCapture(); + final Capture> configCapture = newCapture(); EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); @@ -649,7 +651,7 @@ public class DistributedHerderTest { // CONN2 creation should fail - Capture error = EasyMock.newCapture(); + Capture error = newCapture(); putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.>isNull()); PowerMock.expectLastCall(); @@ -688,7 +690,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SinkConnector.class); EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture> configCapture = EasyMock.newCapture(); + final Capture> configCapture = newCapture(); EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); @@ -700,7 +702,7 @@ public class DistributedHerderTest { // CONN2 creation should fail because the worker group id (connect-test-group) conflicts with // the consumer group id we would use for this sink - Capture error = EasyMock.newCapture(); + Capture error = newCapture(); putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.isNull(Herder.Created.class)); PowerMock.expectLastCall(); @@ -727,7 +729,7 @@ public class DistributedHerderTest { EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture> configCapture = EasyMock.newCapture(); + final Capture> configCapture = newCapture(); EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null); @@ -1482,7 +1484,6 @@ public class DistributedHerderTest { EasyMock.expectLastCall().andThrow(new TimeoutException()); member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log")); EasyMock.expectLastCall(); - PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT); member.requestRejoin(); // After backoff, restart the process and this time succeed @@ -1504,14 +1505,198 @@ public class DistributedHerderTest { PowerMock.replayAll(); + long before = time.milliseconds(); + int workerUnsyncBackoffMs = DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT; + int coordinatorDiscoveryTimeoutMs = 100; herder.tick(); + assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs, time.milliseconds()); + time.sleep(1000L); assertStatistics("leaderUrl", true, 3, 0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY); + before = time.milliseconds(); herder.tick(); + assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds()); time.sleep(2000L); assertStatistics("leaderUrl", false, 3, 1, 100, 2000L); + PowerMock.verifyAll(); + } + + @Test + public void testJoinLeaderCatchUpRetriesForIncrementalCooperative() throws Exception { + connectProtocolVersion = CONNECT_PROTOCOL_V1; + + // Join group and as leader fail to do assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1); + expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectPostRebalanceCatchup(SNAPSHOT); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // The leader got its assignment + expectRebalance(Collections.emptyList(), Collections.emptyList(), + ConnectProtocol.Assignment.NO_ERROR, + 1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0); + + EasyMock.expect(worker.getPlugins()).andReturn(plugins); + // and the new assignment started + worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); + + worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Another rebalance is triggered but this time it fails to read to the max offset and + // triggers a re-sync + expectRebalance(Collections.emptyList(), Collections.emptyList(), + ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.emptyList(), + Collections.emptyList()); + + // The leader will retry a few times to read to the end of the config log + int retries = 2; + member.requestRejoin(); + for (int i = retries; i >= 0; --i) { + // Reading to end of log times out + configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + EasyMock.expectLastCall().andThrow(new TimeoutException()); + member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log")); + EasyMock.expectLastCall(); + } + + // After a few retries succeed to read the log to the end + expectRebalance(Collections.emptyList(), Collections.emptyList(), + ConnectProtocol.Assignment.NO_ERROR, + 1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY); + herder.tick(); + + time.sleep(2000L); + assertStatistics(3, 1, 100, 2000); + herder.tick(); + + long before; + int coordinatorDiscoveryTimeoutMs = 100; + int maxRetries = 5; + for (int i = maxRetries; i >= maxRetries - retries; --i) { + before = time.milliseconds(); + int workerUnsyncBackoffMs = + DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10 / i; + herder.tick(); + assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs, time.milliseconds()); + coordinatorDiscoveryTimeoutMs = 0; + } + + before = time.milliseconds(); + coordinatorDiscoveryTimeoutMs = 100; + herder.tick(); + assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds()); + + PowerMock.verifyAll(); + } + + @Test + public void testJoinLeaderCatchUpFailsForIncrementalCooperative() throws Exception { + connectProtocolVersion = CONNECT_PROTOCOL_V1; + + // Join group and as leader fail to do assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1); + expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectPostRebalanceCatchup(SNAPSHOT); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // The leader got its assignment + expectRebalance(Collections.emptyList(), Collections.emptyList(), + ConnectProtocol.Assignment.NO_ERROR, + 1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0); + + EasyMock.expect(worker.getPlugins()).andReturn(plugins); + // and the new assignment started + worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); + + worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Another rebalance is triggered but this time it fails to read to the max offset and + // triggers a re-sync + expectRebalance(Collections.emptyList(), Collections.emptyList(), + ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.emptyList(), + Collections.emptyList()); + + // The leader will exhaust the retries while trying to read to the end of the config log + int maxRetries = 5; + member.requestRejoin(); + for (int i = maxRetries; i >= 0; --i) { + // Reading to end of log times out + configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + EasyMock.expectLastCall().andThrow(new TimeoutException()); + member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log")); + EasyMock.expectLastCall(); + } + + Capture assignmentCapture = newCapture(); + member.revokeAssignment(capture(assignmentCapture)); + PowerMock.expectLastCall(); + + // After a complete backoff and a revocation of running tasks rejoin and this time succeed + // The worker gets back the assignment that had given up + expectRebalance(Collections.emptyList(), Collections.emptyList(), + ConnectProtocol.Assignment.NO_ERROR, + 1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY); + herder.tick(); + + time.sleep(2000L); + assertStatistics(3, 1, 100, 2000); + herder.tick(); + + long before; + int coordinatorDiscoveryTimeoutMs = 100; + for (int i = maxRetries; i > 0; --i) { + before = time.milliseconds(); + int workerUnsyncBackoffMs = + DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10 / i; + herder.tick(); + assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs, time.milliseconds()); + coordinatorDiscoveryTimeoutMs = 0; + } + + before = time.milliseconds(); + herder.tick(); + assertEquals(before, time.milliseconds()); + assertEquals(Collections.singleton(CONN1), assignmentCapture.getValue().connectors()); + assertEquals(Collections.singleton(TASK1), assignmentCapture.getValue().tasks()); + herder.tick(); PowerMock.verifyAll(); } @@ -1598,7 +1783,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture> configCapture = EasyMock.newCapture(); + final Capture> configCapture = newCapture(); EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);