kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication (#8048)
Date Fri, 07 Feb 2020 22:05:01 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 40183fa  KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication
(#8048)
40183fa is described below

commit 40183fab7e87c52d96c620af0e6e884164177a7d
Author: Sanjana Kaundinya <skaundinya@gmail.com>
AuthorDate: Fri Feb 7 13:58:22 2020 -0800

    KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication (#8048)
    
    The test case `org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication`
has shown to be increasingly flaky recently. This PR aims to make this test more deterministic.
Specifically, the flakiness was due to a timing issue between the tasks not starting up in
time for the test to start running. This PR remediates that by introducing a status check
after every connector is started up. These status checks include that the connector is found
on the connect cluster  [...]
    
    Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../mirror/MirrorConnectorsIntegrationTest.java    | 48 ++++++++++++++++++++--
 1 file changed, 45 insertions(+), 3 deletions(-)

diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
index 11abc14..6331a10 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
@@ -16,9 +16,12 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.admin.Admin;
@@ -31,12 +34,14 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.Collections;
 import java.util.Properties;
-import java.util.concurrent.TimeoutException;
 import java.time.Duration;
 
 import static org.junit.Assert.assertEquals;
@@ -66,7 +71,7 @@ public class MirrorConnectorsIntegrationTest {
     private EmbeddedConnectCluster backup;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() throws IOException, InterruptedException {
         Properties brokerProps = new Properties();
         brokerProps.put("auto.create.topics.enable", "false");
 
@@ -150,6 +155,13 @@ public class MirrorConnectorsIntegrationTest {
         mm2Props.put("backup.bootstrap.servers", backup.kafka().bootstrapServers());
         mm2Config = new MirrorMakerConfig(mm2Props);
 
+        // we wait for the connector and tasks to come up for each connector, so that when
we do the
+        // actual testing, we are certain that the tasks are up and running; this will prevent
+        // flaky tests where the connector and tasks didn't start up in time for the tests
to be
+        // run
+        Set<String> connectorNames = new HashSet<>(Arrays.asList("MirrorSourceConnector",
+            "MirrorCheckpointConnector", "MirrorHeartbeatConnector"));
+
         backup.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new
SourceAndTarget("primary", "backup"),
             MirrorSourceConnector.class));
 
@@ -159,6 +171,8 @@ public class MirrorConnectorsIntegrationTest {
         backup.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new
SourceAndTarget("primary", "backup"),
             MirrorHeartbeatConnector.class));
 
+        waitUntilMirrorMakerIsRunning(backup, connectorNames);
+
         primary.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new
SourceAndTarget("backup", "primary"),
             MirrorSourceConnector.class));
 
@@ -167,6 +181,34 @@ public class MirrorConnectorsIntegrationTest {
 
         primary.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new
SourceAndTarget("backup", "primary"),
             MirrorHeartbeatConnector.class));
+
+        waitUntilMirrorMakerIsRunning(primary, connectorNames);
+    }
+
+
+    private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
+        Set<String> connNames) throws InterruptedException {
+        for (String connector : connNames) {
+            TestUtils.waitForCondition(() -> areConnectorAndTasksRunning(connectCluster,
+                connector), "Timed out trying to verify connector " +
+                connector + " was up!");
+        }
+    }
+
+    private boolean areConnectorAndTasksRunning(EmbeddedConnectCluster connectCluster,
+        String connectorName) {
+        try {
+            ConnectorStateInfo info = connectCluster.connectorStatus(connectorName);
+            boolean result = info != null
+                && !info.tasks().isEmpty()
+                && info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                && info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+            log.debug("Found connector and tasks running: {}", result);
+            return result;
+        } catch (Exception e) {
+            log.error("Could not check connector state info.", e);
+            return false;
+        }
     }
 
     @After
@@ -184,7 +226,7 @@ public class MirrorConnectorsIntegrationTest {
     }
 
     @Test
-    public void testReplication() throws InterruptedException, TimeoutException {
+    public void testReplication() throws InterruptedException {
         MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary"));
         MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup"));
 


Mime
View raw message