kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject [kafka] branch trunk updated: MINOR: TestSecurityRollingUpgrade system test fixes (#10886)
Date Fri, 18 Jun 2021 07:51:37 GMT
This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

The following commit(s) were added to refs/heads/trunk by this push:
     new ebef7d0  MINOR: TestSecurityRollingUpgrade system test fixes (#10886)
ebef7d0 is described below

commit ebef7d0c216ab6b86b5743bb29538dfa27224be4
Author: Ron Dagostino <rdagostino@confluent.io>
AuthorDate: Fri Jun 18 03:50:21 2021 -0400

    MINOR: TestSecurityRollingUpgrade system test fixes (#10886)
    The TestSecurityRollingUpgrade. test_disable_separate_interbroker_listener() system test
had a design flaw: it was migrating inter-broker communication from a SASL_SSL listener to
an SSL listener in one roll while immediately removing the SASL_SSL listener in that roll.
This requires two rolls because the existing SASL_SSL listener must remain available throughout
the first roll so that unrolled brokers can continue to communicate with rolled brokers throughout.
This patch adds the se [...]
    The TestSecurityRollingUpgrade.test_rolling_upgrade_phase_two() system test was not explicitly
identifying the SASL mechanism to enable on a third port when that port was using SASL but
the client security protocol was not SASL-based. This was resulting in an empty sasl.enabled.mechanisms
config, which applied to that third port, and then when the cluster was rolled to take advantage
of this third port for inter-broker communication the potential for an inability to communicate
with o [...]
    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
 .../tests/core/security_rolling_upgrade_test.py    | 32 +++++++++++++++-------
 1 file changed, 22 insertions(+), 10 deletions(-)

diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index fb8812e..aa60878 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -111,11 +111,18 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.kafka.interbroker_sasl_mechanism = broker_sasl_mechanism
-    def remove_separate_broker_listener(self, client_security_protocol, client_sasl_mechanism):
-        # separate interbroker listener port will be closed automatically in setup_interbroker_listener
-        # if not using separate interbroker listener
-        self.kafka.setup_interbroker_listener(client_security_protocol, False)
-        self.kafka.interbroker_sasl_mechanism = client_sasl_mechanism
+    def remove_separate_broker_listener(self, client_security_protocol):
+        # This must be done in two phases: keep listening on the INTERNAL listener while
rolling once to switch
+        # the inter-broker security listener, then roll again to remove the INTERNAL listener.
+        orig_inter_broker_security_protocol = self.kafka.interbroker_security_protocol
+        self.kafka.setup_interbroker_listener(client_security_protocol, False)  # this closes
the INTERNAL listener
+        # Re-open the INTERNAL listener
+        self.kafka.open_port(KafkaService.INTERBROKER_LISTENER_NAME)
+        self.kafka.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME].security_protocol
= orig_inter_broker_security_protocol
+        self.kafka.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME].sasl_mechanism =
+        self.bounce()
+        # Close the INTERNAL listener for good and bounce again to fully migrate to <client_security_protocol>
+        self.kafka.close_port(KafkaService.INTERBROKER_LISTENER_NAME)
@@ -158,6 +165,11 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.kafka.security_protocol = client_protocol
         self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False)
+        # Set any SASL mechanism explicitly when it isn't already set by the client protocol
+        is_broker_protocol_sasl = broker_protocol in [SecurityConfig.SASL_SSL, SecurityConfig.SASL_PLAINTEXT]
+        is_client_protocol_sasl = client_protocol in [SecurityConfig.SASL_SSL, SecurityConfig.SASL_PLAINTEXT]
+        if is_broker_protocol_sasl and not is_client_protocol_sasl:
+            self.kafka.port_mappings[broker_protocol].sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
         #Create Secured Producer and Consumer
@@ -239,12 +251,12 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         Ensure we can produce and consume via SSL listener throughout.
         client_protocol = SecurityConfig.SSL
-        client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+        interbroker_security_protocol = SecurityConfig.SASL_SSL
+        interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
         self.kafka.security_protocol = client_protocol
-        self.kafka.client_sasl_mechanism = client_sasl_mechanism
-        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=True)
-        self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+        self.kafka.setup_interbroker_listener(interbroker_security_protocol, use_separate_listener=True)
+        self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         # create producer and consumer via client security protocol
@@ -252,5 +264,5 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         # run produce/consume/validate loop while disabling a separate interbroker listener
via rolling restart
-            self.remove_separate_broker_listener, client_protocol, client_sasl_mechanism)
+            self.remove_separate_broker_listener, client_protocol)

View raw message