kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10811: Correct the MirrorConnectorsIntegrationTest to correctly mask the exit procedures (#9698)
Date Mon, 07 Dec 2020 15:36:49 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8db3b1a  KAFKA-10811: Correct the MirrorConnectorsIntegrationTest to correctly mask
the exit procedures (#9698)
8db3b1a is described below

commit 8db3b1a09af0bad274e07161336994610d616b35
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Mon Dec 7 09:34:34 2020 -0600

    KAFKA-10811: Correct the MirrorConnectorsIntegrationTest to correctly mask the exit procedures
(#9698)
    
    Normally the `EmbeddedConnectCluster` class masks the `Exit` procedures using within the
Connect worker. This normally works great when a single instance of the embedded cluster is
used. However, the `MirrorConnectorsIntegrationTest` uses two `EmbeddedConnectCluster` instances,
and when the first one is stopped it would reset the (static) exit procedures, and any problems
during shutdown of the second embedded Connect cluster would cause the worker to shut down
the JVM running the tests.
    
    Instead, the `MirrorConnectorsIntegrationTest` class should mask the `Exit` procedures
and instruct the `EmbeddedConnectClusters` instances (via the existing builder method) to
not mask the procedures.
    
    Author: Randall Hauch <rhauch@gmail.com>
    Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
---
 .../mirror/MirrorConnectorsIntegrationTest.java    | 74 +++++++++++++++++-----
 .../util/clusters/EmbeddedConnectCluster.java      |  6 +-
 2 files changed, 61 insertions(+), 19 deletions(-)

diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
index a71b28c..d0715dc 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.common.utils.Exit;
 import org.junit.After;
@@ -44,13 +45,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -75,14 +74,45 @@ public class MirrorConnectorsIntegrationTest {
     private static final int RECORD_CONSUME_DURATION_MS = 20_000;
     private static final int OFFSET_SYNC_DURATION_MS = 30_000;
 
-    private final AtomicBoolean exited = new AtomicBoolean(false);
+    private volatile boolean shuttingDown;
     private Map<String, String> mm2Props;
     private MirrorMakerConfig mm2Config;
     private EmbeddedConnectCluster primary;
     private EmbeddedConnectCluster backup;
 
+    private Exit.Procedure exitProcedure;
+    private Exit.Procedure haltProcedure;
+
     @Before
     public void setup() throws InterruptedException {
+        shuttingDown = false;
+        exitProcedure = (code, message) -> {
+            if (shuttingDown) {
+                // ignore this since we're shutting down Connect and Kafka and timing isn't
always great
+                return;
+            }
+            if (code != 0) {
+                String exitMessage = "Abrupt service exit with code " + code + " and message
" + message;
+                log.warn(exitMessage);
+                throw new UngracefulShutdownException(exitMessage);
+            }
+        };
+        haltProcedure = (code, message) -> {
+            if (shuttingDown) {
+                // ignore this since we're shutting down Connect and Kafka and timing isn't
always great
+                return;
+            }
+            if (code != 0) {
+                String haltMessage = "Abrupt service halt with code " + code + " and message
" + message;
+                log.warn(haltMessage);
+                throw new UngracefulShutdownException(haltMessage);
+            }
+        };
+        // Override the exit and halt procedure that Connect and Kafka will use. For these
integration tests,
+        // we don't want to exit the JVM and instead simply want to fail the test
+        Exit.setExitProcedure(exitProcedure);
+        Exit.setHaltProcedure(haltProcedure);
+
         Properties brokerProps = new Properties();
         brokerProps.put("auto.create.topics.enable", "false");
 
@@ -116,6 +146,7 @@ public class MirrorConnectorsIntegrationTest {
                 .numBrokers(1)
                 .brokerProps(brokerProps)
                 .workerProps(primaryWorkerProps)
+                .maskExitProcedures(false)
                 .build();
 
         backup = new EmbeddedConnectCluster.Builder()
@@ -124,6 +155,7 @@ public class MirrorConnectorsIntegrationTest {
                 .numBrokers(1)
                 .brokerProps(brokerProps)
                 .workerProps(backupWorkerProps)
+                .maskExitProcedures(false)
                 .build();
 
         primary.start();
@@ -164,8 +196,6 @@ public class MirrorConnectorsIntegrationTest {
         mm2Props.put("primary.bootstrap.servers", primary.kafka().bootstrapServers());
         mm2Props.put("backup.bootstrap.servers", backup.kafka().bootstrapServers());
         mm2Config = new MirrorMakerConfig(mm2Props);
-
-        Exit.setExitProcedure((status, errorCode) -> exited.set(true));
     }
 
 
@@ -194,20 +224,27 @@ public class MirrorConnectorsIntegrationTest {
 
     @After
     public void close() {
-        for (String x : primary.connectors()) {
-            primary.deleteConnector(x);
-        }
-        for (String x : backup.connectors()) {
-            backup.deleteConnector(x);
-        }
-        deleteAllTopics(primary.kafka());
-        deleteAllTopics(backup.kafka());
-        primary.stop();
-        backup.stop();
         try {
-            assertFalse(exited.get());
+            for (String x : primary.connectors()) {
+                primary.deleteConnector(x);
+            }
+            for (String x : backup.connectors()) {
+                backup.deleteConnector(x);
+            }
+            deleteAllTopics(primary.kafka());
+            deleteAllTopics(backup.kafka());
         } finally {
-            Exit.resetExitProcedure();
+            shuttingDown = true;
+            try {
+                try {
+                    primary.stop();
+                } finally {
+                    backup.stop();
+                }
+            } finally {
+                Exit.resetExitProcedure();
+                Exit.resetHaltProcedure();
+            }
         }
     }
 
@@ -305,6 +342,9 @@ public class MirrorConnectorsIntegrationTest {
         Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName,
"backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
 
+        primaryClient.close();
+        backupClient.close();
+
         // Failback consumer group to primary cluster
         primaryConsumer = primary.kafka().createConsumer(consumerProps);
         primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 170ad34..4c270d2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -153,8 +153,10 @@ public class EmbeddedConnectCluster {
             log.error("Could not stop kafka", e);
             throw new RuntimeException("Could not stop brokers", e);
         } finally {
-            Exit.resetExitProcedure();
-            Exit.resetHaltProcedure();
+            if (maskExitProcedures) {
+                Exit.resetExitProcedure();
+                Exit.resetHaltProcedure();
+            }
         }
     }
 


Mime
View raw message