kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-2979: Enable authorizer and ACLs in ducktape tests
Date Fri, 08 Jan 2016 04:04:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 40d731b87 -> 49778b184


KAFKA-2979: Enable authorizer and ACLs in ducktape tests

Patch by fpj and benstopford.

Author: flavio junqueira <fpj@apache.org>
Author: Flavio Junqueira <fpj@apache.org>
Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Ben Stopford <benstopford@gmail.com>, Geoff Anderson <geoff@confluent.io>,
Ewen Cheslack-Postava <ewen@confluent.io>

Closes #683 from fpj/KAFKA-2979


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49778b18
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49778b18
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49778b18

Branch: refs/heads/trunk
Commit: 49778b18446d691321026415aeaac1b265057ece
Parents: 40d731b
Author: Flavio Junqueira <fpj@apache.org>
Authored: Thu Jan 7 20:04:24 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Jan 7 20:04:24 2016 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py         |  5 +-
 .../services/kafka/templates/kafka.properties   |  4 ++
 tests/kafkatest/services/security/kafka_acls.py | 75 ++++++++++++++++++++
 .../kafkatest/tests/produce_consume_validate.py |  2 +-
 .../tests/security_rolling_upgrade_test.py      | 14 ++--
 .../tests/zookeeper_security_upgrade_test.py    | 29 +++++---
 6 files changed, 115 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/49778b18/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 9a9feda..cb5018c 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -47,6 +47,8 @@ class KafkaService(JmxMixin, Service):
     # Kafka log segments etc go here
     DATA_LOG_DIR = os.path.join(PERSISTENT_ROOT, "kafka-data-logs")
     CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
+    # Kafka Authorizer
+    SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
 
     logs = {
         "kafka_operational_logs_info": {
@@ -61,7 +63,7 @@ class KafkaService(JmxMixin, Service):
     }
 
     def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
-                 sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, topics=None, version=TRUNK,
quota_config=None, jmx_object_names=None,
+                 sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None,
topics=None, version=TRUNK, quota_config=None, jmx_object_names=None,
                  jmx_attributes=[], zk_connect_timeout=5000):
         """
         :type context
@@ -79,6 +81,7 @@ class KafkaService(JmxMixin, Service):
         self.sasl_mechanism = sasl_mechanism
         self.topics = topics
         self.minikdc = None
+        self.authorizer_class_name = authorizer_class_name
         #
         # In a heavily loaded and not very fast machine, it is
         # sometimes necessary to give more time for the zk client

http://git-wip-us.apache.org/repos/asf/kafka/blob/49778b18/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 228d84b..a718ee2 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -61,6 +61,10 @@ ssl.truststore.password=test-ts-passwd
 ssl.truststore.type=JKS
 sasl.mechanism={{ sasl_mechanism }}
 sasl.kerberos.service.name=kafka
+{% if authorizer_class_name is not none %}
+ssl.client.auth=required
+authorizer.class.name={{ authorizer_class_name }}
+{% endif %}
 
 {% if zk_set_acl is defined %}
 zookeeper.set.acl={{zk_set_acl}}

http://git-wip-us.apache.org/repos/asf/kafka/blob/49778b18/tests/kafkatest/services/security/kafka_acls.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/kafka_acls.py b/tests/kafkatest/services/security/kafka_acls.py
new file mode 100644
index 0000000..eb85354
--- /dev/null
+++ b/tests/kafkatest/services/security/kafka_acls.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.kafka.directory import kafka_dir
+
+class ACLs():
+
+    def __init__(self):
+        pass
+
+    def set_acls(self, protocol, kafka, zk, topic, group):
+        node = kafka.nodes[0]
+        setting = zk.connect_setting()
+
+        # Set server ACLs
+        kafka_principal = "User:CN=systemtest" if protocol == "SSL" else "User:kafka"
+        self.acls_command(node, ACLs.add_cluster_acl(setting, kafka_principal))
+        self.acls_command(node, ACLs.broker_read_acl(setting, "*", kafka_principal))
+
+        # Set client ACLs
+        client_principal = "User:CN=systemtest" if protocol == "SSL" else "User:client"
+        self.acls_command(node, ACLs.produce_acl(setting, topic, client_principal))
+        self.acls_command(node, ACLs.consume_acl(setting, topic, group, client_principal))
+
+    def acls_command(self, node, properties):
+        cmd = "/opt/%s/bin/kafka-acls.sh %s" % (kafka_dir(node), properties)
+        node.account.ssh(cmd)
+
+    @staticmethod
+    def add_cluster_acl(zk_connect, principal="User:kafka"):
+        return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --cluster
" \
+               "--operation=ClusterAction --allow-principal=%(principal)s " % {
+            'zk_connect': zk_connect,
+            'principal': principal
+        }
+
+    @staticmethod
+    def broker_read_acl(zk_connect, topic, principal="User:kafka"):
+        return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s
" \
+               "--operation=Read --allow-principal=%(principal)s " % {
+            'zk_connect': zk_connect,
+            'topic': topic,
+            'principal': principal
+        }
+
+    @staticmethod
+    def produce_acl(zk_connect, topic, principal="User:client"):
+        return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s
" \
+               "--producer --allow-principal=%(principal)s " % {
+            'zk_connect': zk_connect,
+            'topic': topic,
+            'principal': principal
+        }
+
+    @staticmethod
+    def consume_acl(zk_connect, topic, group, principal="User:client"):
+        return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s
" \
+               "--group=%(group)s --consumer --allow-principal=%(principal)s " % {
+            'zk_connect': zk_connect,
+            'topic': topic,
+            'group': group,
+            'principal': principal
+        }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/49778b18/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index f2da000..425b816 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -38,7 +38,7 @@ class ProduceConsumeValidateTest(Test):
         wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10,
              err_msg="Producer failed to start in a reasonable amount of time.")
         self.consumer.start()
-        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=30,
+        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60,
              err_msg="Consumer failed to start in a reasonable amount of time.")
 
     def check_alive(self):

http://git-wip-us.apache.org/repos/asf/kafka/blob/49778b18/tests/kafkatest/tests/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py
index bf01b8a..f19d6b8 100644
--- a/tests/kafkatest/tests/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py
@@ -16,12 +16,13 @@
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
+from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer, is_int
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from ducktape.mark import matrix
+from kafkatest.services.security.kafka_acls import ACLs
 import time
-import random
 
 
 class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
@@ -32,7 +33,9 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
 
     def setUp(self):
+        self.acls = ACLs()
         self.topic = "test_topic"
+        self.group = "group"
         self.producer_throughput = 100
         self.num_producers = 1
         self.num_consumers = 1
@@ -52,7 +55,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
             self.test_context, self.num_consumers, self.kafka, self.topic,
             consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
 
-        self.consumer.group_id = "unique-test-group-" + str(random.random())
+        self.consumer.group_id = "group"
 
     def bounce(self):
         self.kafka.start_minikdc()
@@ -71,6 +74,9 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
 
         # Roll cluster to disable PLAINTEXT port
         self.kafka.close_port('PLAINTEXT')
+        self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
+        self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
+        self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
         self.bounce()
 
     def open_secured_port(self, client_protocol):
@@ -105,8 +111,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         """
         Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase
one).
         Start an Producer and Consumer via the SECURED port
-        Rolling upgrade to add inter-broker be the secure protocol
-        Rolling upgrade again to disable PLAINTEXT
+        Incrementally upgrade to add inter-broker be the secure protocol
+        Incrementally upgrade again to add ACLs as well as disabling the PLAINTEXT port
         Ensure the producer and consumer ran throughout
         """
         #Given we have a broker that has both secure and PLAINTEXT ports open

http://git-wip-us.apache.org/repos/asf/kafka/blob/49778b18/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
index 0714832..3bfc478 100644
--- a/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
@@ -21,10 +21,9 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer, is_int
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-
+from kafkatest.services.security.kafka_acls import ACLs
 import time
 
-
 class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
     """Tests a rolling upgrade for zookeeper.
     """
@@ -34,9 +33,11 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
 
     def setUp(self):
         self.topic = "test_topic"
+        self.group = "group"
         self.producer_throughput = 100
         self.num_producers = 1
         self.num_consumers = 1
+        self.acls = ACLs()
 
         self.zk = ZookeeperService(self.test_context, num_nodes=3)
 
@@ -54,24 +55,29 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
             self.test_context, self.num_consumers, self.kafka, self.topic,
             consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
 
-        self.consumer.group_id = "group"
-
+        self.consumer.group_id = self.group
 
     @property
     def no_sasl(self):
         return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol
== "SSL"
 
+    @property
+    def is_secure(self):
+        return self.kafka.security_protocol == "SASL_PLAINTEXT" \
+               or self.kafka.security_protocol == "SSL" \
+               or self.kafka.security_protocol == "SASL_SSL"
+
     def run_zk_migration(self):
         # change zk config (auth provider + jaas login)
         self.zk.kafka_opts = self.zk.security_system_properties
         self.zk.zk_sasl = True
-        if(self.no_sasl):
+        if self.no_sasl:
             self.kafka.start_minikdc(self.zk.zk_principals)
         # restart zk
         for node in self.zk.nodes:
             self.zk.stop_node(node)
             self.zk.start_node(node)
-        
+
         # restart broker with jaas login
         for node in self.kafka.nodes:
             self.kafka.stop_node(node)
@@ -81,16 +87,23 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
         for node in self.zk.nodes:
             self.zk.zookeeper_migration(node, "secure")
 
-        # restart broker with zookeeper.set.acl=true
+        # restart broker with zookeeper.set.acl=true and acls
         self.kafka.zk_set_acl = "true"
         for node in self.kafka.nodes:
             self.kafka.stop_node(node)
             self.kafka.start_node(node)
 
-    @matrix(security_protocol=["SASL_SSL","SSL","SASL_PLAINTEXT","PLAINTEXT"])
+    @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
     def test_zk_security_upgrade(self, security_protocol):
         self.zk.start()
         self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+
+        # set acls
+        if self.is_secure:
+            self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
+            self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
+
         if(self.no_sasl):
             self.kafka.start()
         else:


Mime
View raw message