kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] 01/02: MINOR: ACLs for secured cluster system tests (#9378)
Date Thu, 15 Oct 2020 21:15:17 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 48ed6de7150ef58e95be03ddd906aadd07a83a3c
Author: Ron Dagostino <rdagostino@confluent.io>
AuthorDate: Fri Oct 9 10:34:53 2020 -0400

    MINOR: ACLs for secured cluster system tests (#9378)
    
    This PR adds missing broker ACLs required to create topics and SCRAM credentials when
ACLs are enabled for a system test. This PR also adds support for using PLAINTEXT as the inter
broker security protocol when using SCRAM from the client in a system test with a secured
cluster-- without this it would always be necessary to set both the inter-broker and client
mechanisms to a SCRAM mechanism. Also contains some refactoring to make assumptions clearer.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
---
 tests/kafkatest/services/kafka/kafka.py            | 232 ++++++++++++++-------
 tests/kafkatest/services/security/kafka_acls.py    |  82 +++-----
 .../kafkatest/services/security/security_config.py |  37 +---
 .../templates/admin_client_as_broker_jaas.conf     |  24 +++
 .../tests/core/zookeeper_security_upgrade_test.py  |   3 +-
 5 files changed, 223 insertions(+), 155 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index be56b85..22f0f74 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -71,6 +71,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
     INTERBROKER_LISTENER_NAME = 'INTERNAL'
     JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
+    ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/admin_client_as_broker_jaas.conf"
     KRB5_CONF = "java.security.krb5.conf=/mnt/security/krb5.conf"
 
     logs = {
@@ -372,7 +373,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
 
         self.security_config.setup_node(node)
-        self.security_config.maybe_setup_broker_scram_credentials(node, self.path, "--zookeeper
%s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption()))
+        self.maybe_setup_broker_scram_credentials(node)
 
         prop_file = self.prop_file(node)
         self.logger.info("kafka.properties:")
@@ -393,7 +394,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         # existing credentials from ZK and dynamic update of credentials in Kafka are tested.
         # We use the admin client and connect as the broker user when creating the client
(non-broker) credentials
         # if Kafka supports KIP-554, otherwise we use ZooKeeper.
-        self.security_config.maybe_setup_client_scram_credentials(node, self.path, self._connect_setting_kafka_configs_scram(node))
+        self.maybe_setup_client_scram_credentials(node)
 
         self.start_jmx_tool(self.idx(node), node)
         if len(self.pids(node)) == 0:
@@ -445,23 +446,113 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                                          clean_shutdown=False, allow_fail=True)
         node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
 
-    def _kafka_topics_cmd(self, node, force_use_zk_connection):
-        """
-        Returns kafka-topics.sh command path with jaas configuration and krb5 environment
variable
-        set. If Admin client is not going to be used, don't set the environment variable.
-        """
+    def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection,
kafka_security_protocol = None):
+        if force_use_zk_connection:
+            bootstrap_server_or_zookeeper = "--zookeeper %s" % (self.zk_connect_setting())
+            skip_optional_security_settings = True
+        else:
+            if kafka_security_protocol is None:
+                # it wasn't specified, so use the inter-broker security protocol if it is
PLAINTEXT,
+                # otherwise use the client security protocol
+                if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+                    security_protocol_to_use = SecurityConfig.PLAINTEXT
+                else:
+                    security_protocol_to_use = self.security_protocol
+            else:
+                security_protocol_to_use = kafka_security_protocol
+            bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
+            skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
+        if skip_optional_security_settings:
+            optional_jass_krb_system_props_prefix = ""
+            optional_command_config_suffix = ""
+        else:
+            # we need security configs because aren't going to ZooKeeper and we aren't using
PLAINTEXT
+            if (security_protocol_to_use == self.interbroker_security_protocol):
+                # configure JAAS to provide the broker's credentials
+                # since this is an authenticating cluster and we are going to use the inter-broker
security protocol
+                jaas_conf_prop = KafkaService.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = True
+            else:
+                # configure JAAS to provide the typical client credentials
+                jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = False
+            optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " % (jaas_conf_prop,
KafkaService.KRB5_CONF)
+            optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client
= use_inter_broker_mechanism_for_client))
         kafka_topic_script = self.path.script("kafka-topics.sh", node)
-        skip_security_settings = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server()
-        return kafka_topic_script if skip_security_settings else \
-            "KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, KafkaService.KRB5_CONF,
kafka_topic_script)
+        return "%s%s %s%s" % \
+               (optional_jass_krb_system_props_prefix, kafka_topic_script,
+                bootstrap_server_or_zookeeper, optional_command_config_suffix)
+
+    def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_connection,
kafka_security_protocol = None):
+        if force_use_zk_connection:
+            # kafka-configs supports a TLS config file, so include it if there is one
+            bootstrap_server_or_zookeeper = "--zookeeper %s %s" % (self.zk_connect_setting(),
self.zk.zkTlsConfigFileOption())
+            skip_optional_security_settings = True
+        else:
+            if kafka_security_protocol is None:
+                # it wasn't specified, so use the inter-broker security protocol if it is
PLAINTEXT,
+                # otherwise use the client security protocol
+                if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+                    security_protocol_to_use = SecurityConfig.PLAINTEXT
+                else:
+                    security_protocol_to_use = self.security_protocol
+            else:
+                security_protocol_to_use = kafka_security_protocol
+            bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
+            skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
+        if skip_optional_security_settings:
+            optional_jass_krb_system_props_prefix = ""
+            optional_command_config_suffix = ""
+        else:
+            # we need security configs because aren't going to ZooKeeper and we aren't using
PLAINTEXT
+            if (security_protocol_to_use == self.interbroker_security_protocol):
+                # configure JAAS to provide the broker's credentials
+                # since this is an authenticating cluster and we are going to use the inter-broker
security protocol
+                jaas_conf_prop = KafkaService.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = True
+            else:
+                # configure JAAS to provide the typical client credentials
+                jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = False
+            optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " % (jaas_conf_prop,
KafkaService.KRB5_CONF)
+            optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client
= use_inter_broker_mechanism_for_client))
+        kafka_config_script = self.path.script("kafka-configs.sh", node)
+        return "%s%s %s%s" % \
+               (optional_jass_krb_system_props_prefix, kafka_config_script,
+                bootstrap_server_or_zookeeper, optional_command_config_suffix)
+
+    def maybe_setup_broker_scram_credentials(self, node):
+        security_config = self.security_config
+        # we only need to create broker credentials when the broker mechanism is SASL/SCRAM
+        if security_config.is_sasl(self.interbroker_security_protocol) and security_config.is_sasl_scram(self.interbroker_sasl_mechanism):
+            force_use_zk_connection = True # we are bootstrapping these credentials before
Kafka is started
+            cmd = fix_opts_for_new_jvm(node)
+            cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter
--add-config %(mechanism)s=[password=%(password)s]" % {
+                'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node,
force_use_zk_connection),
+                'user': SecurityConfig.SCRAM_BROKER_USER,
+                'mechanism': self.interbroker_sasl_mechanism,
+                'password': SecurityConfig.SCRAM_BROKER_PASSWORD
+            }
+            node.account.ssh(cmd)
 
-    def _kafka_topics_cmd_config(self, node, force_use_zk_connection):
-        """
-        Return --command-config parameter to the kafka-topics.sh command. The config parameter
specifies
-        the security settings that AdminClient uses to connect to a secure kafka server.
-        """
-        skip_command_config = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server()
-        return "" if skip_command_config else " --command-config <(echo '%s')" % (self.security_config.client_config())
+    def maybe_setup_client_scram_credentials(self, node):
+        security_config = self.security_config
+        # we only need to create client credentials when the client mechanism is SASL/SCRAM
+        if security_config.is_sasl(self.security_protocol) and security_config.is_sasl_scram(self.client_sasl_mechanism):
+            force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server_scram()
+            # ignored if forcing the use of Zookeeper, but we need a value to send, so calculate
it anyway
+            if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+                kafka_security_protocol = self.interbroker_security_protocol
+            else:
+                kafka_security_protocol = self.security_protocol
+            cmd = fix_opts_for_new_jvm(node)
+            cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter
--add-config %(mechanism)s=[password=%(password)s]" % {
+                'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node,
force_use_zk_connection, kafka_security_protocol),
+                'user': SecurityConfig.SCRAM_CLIENT_USER,
+                'mechanism': self.client_sasl_mechanism,
+                'password': SecurityConfig.SCRAM_CLIENT_PASSWORD
+            }
+            node.account.ssh(cmd)
 
     def all_nodes_topic_command_supports_bootstrap_server(self):
         for node in self.nodes:
@@ -515,9 +606,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                             (topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server())
 
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " %
{
-            'kafka_topics_cmd': self._kafka_topics_cmd(node, force_use_zk_connection),
-            'connection_string': self._topic_command_connect_setting(node, force_use_zk_connection),
+        cmd += "%(kafka_topics_cmd)s --create --topic %(topic)s " % {
+            'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node,
force_use_zk_connection),
             'topic': topic_cfg.get("topic"),
         }
         if 'replica-assignment' in topic_cfg:
@@ -537,8 +627,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             for config_name, config_value in topic_cfg["configs"].items():
                 cmd += " --config %s=%s" % (config_name, str(config_value))
 
-        cmd += self._kafka_topics_cmd_config(node, force_use_zk_connection)
-
         self.logger.info("Running topic creation command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -556,10 +644,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()
 
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --topic %s --delete %s" % \
-               (self._kafka_topics_cmd(node, force_use_zk_connection),
-                self._topic_command_connect_setting(node, force_use_zk_connection),
-                topic, self._kafka_topics_cmd_config(node, force_use_zk_connection))
+        cmd += "%s --topic %s --delete" % \
+               (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection),
topic)
         self.logger.info("Running topic delete command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -570,10 +656,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()
 
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --topic %s --describe %s" % \
-              (self._kafka_topics_cmd(node, force_use_zk_connection),
-               self._topic_command_connect_setting(node, force_use_zk_connection),
-               topic, self._kafka_topics_cmd_config(node, force_use_zk_connection))
+        cmd += "%s --topic %s --describe" % \
+               (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection),
topic)
 
         self.logger.info("Running topic describe command...\n%s" % cmd)
         output = ""
@@ -588,9 +672,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()
 
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --list %s" % (self._kafka_topics_cmd(node, force_use_zk_connection),
-                                   self._topic_command_connect_setting(node, force_use_zk_connection),
-                                   self._kafka_topics_cmd_config(node, force_use_zk_connection))
+        cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(node,
force_use_zk_connection))
         for line in node.account.ssh_capture(cmd):
             if not line.startswith("SLF4J"):
                 yield line.rstrip()
@@ -600,9 +682,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             node = self.nodes[0]
         self.logger.info("Altering message format version for topic %s with format %s", topic,
msg_format_version)
 
+        force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server()
+
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s"
% \
-              (self.path.script("kafka-configs.sh", node), self._connect_setting_kafka_configs(node),
topic, msg_format_version)
+        cmd += "%s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s"
% \
+              (self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection),
topic, msg_format_version)
         self.logger.info("Running alter message format command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -614,38 +698,54 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         else:
             self.logger.info("Disabling unclean leader election for topic %s", topic)
 
+        force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server()
+
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s"
% \
-              (self.path.script("kafka-configs.sh", node), self._connect_setting_kafka_configs(node),
topic, str(value).lower())
+        cmd += "%s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s"
% \
+              (self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection),
topic, str(value).lower())
         self.logger.info("Running alter unclean leader command...\n%s" % cmd)
         node.account.ssh(cmd)
 
-    def _connect_setting_kafka_configs(self, node):
-        # Use this for everything related to kafka-configs except User SCRAM Credentials
-        if self.all_nodes_configs_command_uses_bootstrap_server():
-            return "--bootstrap-server %s --command-config <(echo '%s')" % (self.bootstrap_servers(self.security_protocol),
-                                                                            self.security_config.client_config())
+    def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection,
kafka_security_protocol = None, override_command_config = None):
+        force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server
+        if force_use_zk_connection:
+            bootstrap_server_or_authorizer_zk_props = "--authorizer-properties zookeeper.connect=%s"
% (self.zk_connect_setting())
+            skip_optional_security_settings = True
         else:
-            return "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption())
-
-    def _connect_setting_kafka_configs_scram(self, node):
-        # Use this for kafka-configs when operating on User SCRAM Credentials
-        if self.all_nodes_configs_command_uses_bootstrap_server_scram():
-            return "--bootstrap-server %s --command-config <(echo '%s')" %\
-                   (self.bootstrap_servers(self.security_protocol),
-                    self.security_config.client_config(use_inter_broker_mechanism_for_client
= True))
+            if kafka_security_protocol is None:
+                # it wasn't specified, so use the inter-broker security protocol if it is
PLAINTEXT,
+                # otherwise use the client security protocol
+                if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+                    security_protocol_to_use = SecurityConfig.PLAINTEXT
+                else:
+                    security_protocol_to_use = self.security_protocol
+            else:
+                security_protocol_to_use = kafka_security_protocol
+            bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
+            skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
+        if skip_optional_security_settings:
+            optional_jass_krb_system_props_prefix = ""
+            optional_command_config_suffix = ""
         else:
-            return "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption())
-
-    def kafka_acls_cmd(self, node, force_use_zk_connection):
-        """
-        Returns kafka-acls.sh command path with jaas configuration and krb5 environment variable
-        set. If Admin client is not going to be used, don't set the environment variable.
-        """
+            # we need security configs because aren't going to ZooKeeper and we aren't using
PLAINTEXT
+            if (security_protocol_to_use == self.interbroker_security_protocol):
+                # configure JAAS to provide the broker's credentials
+                # since this is an authenticating cluster and we are going to use the inter-broker
security protocol
+                jaas_conf_prop = KafkaService.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = True
+            else:
+                # configure JAAS to provide the typical client credentials
+                jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = False
+            optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " % (jaas_conf_prop,
KafkaService.KRB5_CONF)
+            if override_command_config is None:
+                optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client
= use_inter_broker_mechanism_for_client))
+            else:
+                optional_command_config_suffix = " --command-config %s" % (override_command_config)
         kafka_acls_script = self.path.script("kafka-acls.sh", node)
-        skip_security_settings = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server()
-        return kafka_acls_script if skip_security_settings else \
-            "KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, KafkaService.KRB5_CONF,
kafka_acls_script)
+        return "%s%s %s%s" % \
+               (optional_jass_krb_system_props_prefix, kafka_acls_script,
+                bootstrap_server_or_authorizer_zk_props, optional_command_config_suffix)
 
     def run_cli_tool(self, node, cmd):
         output = ""
@@ -977,18 +1077,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def zk_connect_setting(self):
         return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
 
-    def _topic_command_connect_setting(self, node, force_use_zk_connection):
-        """
-        Checks if --bootstrap-server config is supported, if yes then returns a string with
-        bootstrap server, otherwise returns zookeeper connection string.
-        """
-        if not force_use_zk_connection and self.all_nodes_topic_command_supports_bootstrap_server():
-            connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol))
-        else:
-            connection_setting = "--zookeeper %s" % (self.zk_connect_setting())
-
-        return connection_setting
-
     def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
         if validate and not port.open:
             raise ValueError("We are retrieving bootstrap servers for the port: %s which
is not currently open. - " %
diff --git a/tests/kafkatest/services/security/kafka_acls.py b/tests/kafkatest/services/security/kafka_acls.py
index 3bb3e6f..96438bc 100644
--- a/tests/kafkatest/services/security/kafka_acls.py
+++ b/tests/kafkatest/services/security/kafka_acls.py
@@ -13,18 +13,27 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.services.kafka.util import fix_opts_for_new_jvm
 
-
-class ACLs(KafkaPathResolverMixin):
+class ACLs:
     def __init__(self, context):
         self.context = context
 
-    def set_acls(self, protocol, kafka, topic, group, force_use_zk_connection=False):
+    def set_acls(self, protocol, kafka, topic, group, force_use_zk_connection=False, additional_cluster_operations_to_grant
= []):
+        """
+        Creates ACls for the Kafka Broker principal that brokers use in tests
+
+        :param protocol: the security protocol to use (e.g. PLAINTEXT, SASL_PLAINTEXT, etc.)
+        :param kafka: Kafka cluster upon which ClusterAction ACL is created
+        :param topic: topic for which produce and consume ACLs are created
+        :param group: consumer group for which consume ACL is created
+        :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise
AdminClient is used when available.
+               This is necessary for the case where we are bootstrapping ACLs before Kafka
is started or before authorizer is enabled
+        :param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if
the cluster is secured since these are required
+               to create SCRAM credentials and topics, respectively
+        """
         # Set server ACLs
         kafka_principal = "User:CN=systemtest" if protocol == "SSL" else "User:kafka"
-        self.add_cluster_acl(kafka, kafka_principal, force_use_zk_connection=force_use_zk_connection)
+        self.add_cluster_acl(kafka, kafka_principal, force_use_zk_connection=force_use_zk_connection,
additional_cluster_operations_to_grant = additional_cluster_operations_to_grant)
         self.add_read_acl(kafka, kafka_principal, "*", force_use_zk_connection=force_use_zk_connection)
 
         # Set client ACLs
@@ -32,39 +41,6 @@ class ACLs(KafkaPathResolverMixin):
         self.add_produce_acl(kafka, client_principal, topic, force_use_zk_connection=force_use_zk_connection)
         self.add_consume_acl(kafka, client_principal, topic, group, force_use_zk_connection=force_use_zk_connection)
 
-    def _acl_command_connect_setting(self, kafka, node, force_use_zk_connection):
-        """
-        Checks if --bootstrap-server config is supported, if yes then returns a string with
-        bootstrap server, otherwise returns authorizer properties for zookeeper connection.
-        """
-        if not force_use_zk_connection and kafka.all_nodes_acl_command_supports_bootstrap_server():
-            connection_setting = "--bootstrap-server %s" % (kafka.bootstrap_servers(kafka.security_protocol))
-        else:
-            connection_setting = "--authorizer-properties zookeeper.connect=%s" % (kafka.zk_connect_setting())
-
-        return connection_setting
-
-    def _kafka_acls_cmd_config(self, kafka, node, force_use_zk_connection):
-        """
-        Return --command-config parameter to the kafka-acls.sh command. The config parameter
specifies
-        the security settings that AdminClient uses to connect to a secure kafka server.
-        """
-        skip_command_config = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
-        return "" if skip_command_config else " --command-config <(echo '%s')" % (kafka.security_config.client_config())
-
-    def _acl_cmd_prefix(self, kafka, node, force_use_zk_connection):
-        """
-        :param node: Node to use when determining connection settings
-        :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise
AdminClient is used when available
-        :return command prefix for running kafka-acls
-        """
-        cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s %s" % (
-            kafka.kafka_acls_cmd(node, force_use_zk_connection),
-            self._acl_command_connect_setting(kafka, node, force_use_zk_connection),
-            self._kafka_acls_cmd_config(kafka, node, force_use_zk_connection))
-        return cmd
-
     def _add_acl_on_topic(self, kafka, principal, topic, operation_flag, node, force_use_zk_connection):
         """
         :param principal: principal for which ACL is created
@@ -74,30 +50,32 @@ class ACLs(KafkaPathResolverMixin):
         :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise
AdminClient is used when available
         """
         cmd = "%(cmd_prefix)s --add --topic=%(topic)s %(operation_flag)s --allow-principal=%(principal)s"
% {
-            'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection),
+            'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(kafka, node,
force_use_zk_connection),
             'topic': topic,
             'operation_flag': operation_flag,
             'principal': principal
         }
         kafka.run_cli_tool(node, cmd)
 
-    def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False):
+    def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_grant
= []):
         """
         :param kafka: Kafka cluster upon which ClusterAction ACL is created
         :param principal: principal for which ClusterAction ACL is created
         :param node: Node to use when determining connection settings
         :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise
AdminClient is used when available.
                This is necessary for the case where we are bootstrapping ACLs before Kafka
is started or before authorizer is enabled
+        :param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if
the cluster is secured since these are required
+               to create SCRAM credentials and topics, respectively
         """
         node = kafka.nodes[0]
 
-        force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
-
-        cmd = "%(cmd_prefix)s --add --cluster --operation=ClusterAction --allow-principal=%(principal)s"
% {
-            'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection),
-            'principal': principal
-        }
-        kafka.run_cli_tool(node, cmd)
+        for operation in ['ClusterAction'] + additional_cluster_operations_to_grant:
+            cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s --allow-principal=%(principal)s"
% {
+                'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(kafka,
node, force_use_zk_connection),
+                'operation': operation,
+                'principal': principal
+            }
+            kafka.run_cli_tool(node, cmd)
 
     def add_read_acl(self, kafka, principal, topic, force_use_zk_connection=False):
         """
@@ -110,8 +88,6 @@ class ACLs(KafkaPathResolverMixin):
         """
         node = kafka.nodes[0]
 
-        force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
-
         self._add_acl_on_topic(kafka, principal, topic, "--operation=Read", node, force_use_zk_connection)
 
     def add_produce_acl(self, kafka, principal, topic, force_use_zk_connection=False):
@@ -125,8 +101,6 @@ class ACLs(KafkaPathResolverMixin):
         """
         node = kafka.nodes[0]
 
-        force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
-
         self._add_acl_on_topic(kafka, principal, topic, "--producer", node, force_use_zk_connection)
 
     def add_consume_acl(self, kafka, principal, topic, group, force_use_zk_connection=False):
@@ -141,10 +115,8 @@ class ACLs(KafkaPathResolverMixin):
         """
         node = kafka.nodes[0]
 
-        force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
-
         cmd = "%(cmd_prefix)s --add --topic=%(topic)s --group=%(group)s --consumer --allow-principal=%(principal)s"
% {
-            'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection),
+            'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(kafka, node,
force_use_zk_connection),
             'topic': topic,
             'group': group,
             'principal': principal
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index 437084b..f68b93d 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -259,8 +259,16 @@ class SecurityConfig(TemplateRenderer):
         if self.static_jaas_conf:
             node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
             node.account.create_file(SecurityConfig.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PATH,
-                                     self.render_jaas_config("admin_client_as_broker_jaas.conf",
-                                                             {'SecurityConfig': SecurityConfig}))
+                                     self.render_jaas_config(
+                                         "admin_client_as_broker_jaas.conf",
+                                         {
+                                             'node': node,
+                                             'is_ibm_jdk': any('IBM' in line for line in
java_version),
+                                             'SecurityConfig': SecurityConfig,
+                                             'client_sasl_mechanism': self.client_sasl_mechanism,
+                                             'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms
+                                         }
+                                     ))
 
         elif 'sasl.jaas.config' not in self.properties:
             self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " \\\n")
@@ -290,23 +298,6 @@ class SecurityConfig(TemplateRenderer):
         if java_version(node) <= 11 and self.properties.get('tls.version') == 'TLSv1.3':
             self.properties.update({'tls.version': 'TLSv1.2'})
 
-    def maybe_setup_broker_scram_credentials(self, node, path, connect):
-        self.maybe_create_scram_credentials(node, connect, path, self.interbroker_sasl_mechanism,
-                                            SecurityConfig.SCRAM_BROKER_USER, SecurityConfig.SCRAM_BROKER_PASSWORD)
-
-    def maybe_setup_client_scram_credentials(self, node, path, connect):
-        self.maybe_create_scram_credentials(node, connect, path, self.client_sasl_mechanism,
-                                            SecurityConfig.SCRAM_CLIENT_USER, SecurityConfig.SCRAM_CLIENT_PASSWORD,
-                                            self.export_kafka_opts_for_admin_client_as_broker())
-
-    def maybe_create_scram_credentials(self, node, connect, path, mechanism, user_name, password,
kafka_opts_for_admin_client_as_broker = ""):
-        # we only need to create these credentials when the client and broker mechanisms
are both SASL/SCRAM
-        if self.has_sasl and self.is_sasl_scram(mechanism) and self.is_sasl_scram(self.interbroker_sasl_mechanism):
-            cmd = "%s %s %s --entity-name %s --entity-type users --alter --add-config %s=[password=%s]"
% \
-                  (kafka_opts_for_admin_client_as_broker, path.script("kafka-configs.sh",
node), connect,
-                  user_name, mechanism, password)
-            node.account.ssh(cmd)
-
     def clean_node(self, node):
         if self.security_protocol != SecurityConfig.PLAINTEXT:
             node.account.ssh("rm -rf %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
@@ -366,14 +357,6 @@ class SecurityConfig(TemplateRenderer):
         else:
             return ""
 
-    def export_kafka_opts_for_admin_client_as_broker(self):
-        if self.has_sasl and self.static_jaas_conf:
-            kafka_opts_to_use = "\"-Djava.security.auth.login.config=%s -Djava.security.krb5.conf=%s\""\
-                                % (SecurityConfig.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PATH,
SecurityConfig.KRB5CONF_PATH)
-        else:
-            kafka_opts_to_use = self.kafka_opts
-        return "export KAFKA_OPTS=%s;" % kafka_opts_to_use
-
     def props(self, prefix=''):
         """
         Return properties as string with line separators, optionally with a prefix.
diff --git a/tests/kafkatest/services/security/templates/admin_client_as_broker_jaas.conf
b/tests/kafkatest/services/security/templates/admin_client_as_broker_jaas.conf
index b21d5da..b53da11 100644
--- a/tests/kafkatest/services/security/templates/admin_client_as_broker_jaas.conf
+++ b/tests/kafkatest/services/security/templates/admin_client_as_broker_jaas.conf
@@ -13,7 +13,31 @@
 
 
 KafkaClient {
+{% if "GSSAPI" in enabled_sasl_mechanisms %}
+{% if is_ibm_jdk %}
+    com.ibm.security.auth.module.Krb5LoginModule required debug=false
+    credsType=both
+    useKeytab="file:/mnt/security/keytab"
+    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
+{% else %}
+    com.sun.security.auth.module.Krb5LoginModule required debug=false
+    doNotPrompt=true
+    useKeyTab=true
+    storeKey=true
+    keyTab="/mnt/security/keytab"
+    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
+{% endif %}
+{% endif %}
+{% if "PLAIN" in enabled_sasl_mechanisms %}
+	org.apache.kafka.common.security.plain.PlainLoginModule required
+	username="kafka"
+	password="kafka-secret"
+	user_client="client-secret"
+	user_kafka="kafka-secret";
+{% endif %}
+{% if "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in client_sasl_mechanism
%}
 	org.apache.kafka.common.security.scram.ScramLoginModule required
 	username="{{ SecurityConfig.SCRAM_BROKER_USER }}"
 	password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}";
+{% endif %}
 };
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
index 15eac3b..241c381 100644
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -97,7 +97,8 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
         if self.is_secure:
             self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
             # Force use of direct ZooKeeper access because Kafka is not yet started
-            self.acls.set_acls(security_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
+            self.acls.set_acls(security_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True,
+                               additional_cluster_operations_to_grant=['Create'])
 
         if self.no_sasl:
             self.kafka.start()


Mime
View raw message