kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2643: Run mirror maker ducktape tests with SSL and SASL
Date Wed, 25 Nov 2015 23:05:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 34a6be2cc -> 69a1cced4


KAFKA-2643: Run mirror maker ducktape tests with SSL and SASL

Run tests with SSL, SASL_PLAINTEXT and SASL_SSL. Same security protocol is used for source
and target Kafka.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Geoff Andreson, Ben Stopford

Closes #559 from rajinisivaram/KAFKA-2643


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69a1cced
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69a1cced
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69a1cced

Branch: refs/heads/trunk
Commit: 69a1cced49d7d0c805adeb1dfd327f8bb5c7ce9a
Parents: 34a6be2
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Wed Nov 25 15:05:31 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Wed Nov 25 15:05:31 2015 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py         |  8 ++---
 tests/kafkatest/services/mirror_maker.py        | 10 +++++++
 tests/kafkatest/services/verifiable_producer.py |  6 ++--
 tests/kafkatest/tests/mirror_maker_test.py      | 31 +++++++++++++++-----
 4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69a1cced/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 15f541d..4669a35 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -71,6 +71,7 @@ class KafkaService(JmxMixin, Service):
         self.interbroker_security_protocol = interbroker_security_protocol
         self.sasl_mechanism = sasl_mechanism
         self.topics = topics
+        self.minikdc = None
 
         for node in self.nodes:
             node.version = version
@@ -82,10 +83,9 @@ class KafkaService(JmxMixin, Service):
 
     def start(self):
         if self.security_config.has_sasl_kerberos:
-            self.minikdc = MiniKdc(self.context, self.nodes)
-            self.minikdc.start()
-        else:
-            self.minikdc = None
+            if self.minikdc is None:
+                self.minikdc = MiniKdc(self.context, self.nodes)
+                self.minikdc.start()
         Service.start(self)
 
         # Create topics if necessary

http://git-wip-us.apache.org/repos/asf/kafka/blob/69a1cced/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 0bba115..4386788 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -18,6 +18,7 @@ from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
 from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.security.security_config import SecurityConfig
 
 import os
 import subprocess
@@ -113,6 +114,7 @@ class MirrorMaker(Service):
     def start_cmd(self, node):
         cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
+        cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " /opt/%s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % kafka_dir(node)
         cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
         cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
@@ -147,16 +149,23 @@ class MirrorMaker(Service):
         node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
         node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
 
+        self.security_config = self.source.security_config.client_config()
+        self.security_config.setup_node(node)
+
         # Create, upload one consumer config file for source cluster
         consumer_props = self.render("mirror_maker_consumer.properties")
+        consumer_props += str(self.security_config)
+
         node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props)
         self.logger.info("Mirrormaker consumer props:\n" + consumer_props)
 
         # Create, upload producer properties file for target cluster
         producer_props = self.render('mirror_maker_producer.properties')
+        producer_props += str(self.security_config)
         self.logger.info("Mirrormaker producer props:\n" + producer_props)
         node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props)
 
+
         # Create and upload log properties
         log_config = self.render('tools_log4j.properties', log_file=MirrorMaker.LOG_FILE)
         node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
@@ -180,3 +189,4 @@ class MirrorMaker(Service):
                              (self.__class__.__name__, node.account))
         node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
+        self.security_config.clean_node(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/69a1cced/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index b2d2b97..c0dec4d 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -55,15 +55,15 @@ class VerifiableProducer(BackgroundThreadService):
             node.version = version
         self.acked_values = []
         self.not_acked_values = []
-
         self.prop_file = ""
-        self.security_config = kafka.security_config.client_config(self.prop_file)
-        self.prop_file += str(self.security_config)
+
 
     def _worker(self, idx, node):
         node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
 
         # Create and upload log properties
+        self.security_config = self.kafka.security_config.client_config(self.prop_file)
+        self.prop_file += str(self.security_config)
         log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
         node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69a1cced/tests/kafkatest/tests/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker_test.py
index ad252ee..0244f81 100644
--- a/tests/kafkatest/tests/mirror_maker_test.py
+++ b/tests/kafkatest/tests/mirror_maker_test.py
@@ -21,6 +21,7 @@ from kafkatest.services.kafka import KafkaService
 from kafkatest.services.console_consumer import ConsoleConsumer, is_int
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.mirror_maker import MirrorMaker
+from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 
 import time
@@ -39,7 +40,6 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
                                   topics={self.topic: {"partitions": 1, "replication-factor":
1}})
         self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
                                   topics={self.topic: {"partitions": 1, "replication-factor":
1}})
-
         # This will produce to source kafka cluster
         self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka,
topic=self.topic,
                                            throughput=1000)
@@ -52,10 +52,21 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
     def setUp(self):
         # Source cluster
         self.source_zk.start()
-        self.source_kafka.start()
 
         # Target cluster
         self.target_zk.start()
+
+    def start_kafka(self, security_protocol):
+        self.source_kafka.security_protocol = security_protocol
+        self.source_kafka.interbroker_security_protocol = security_protocol
+        self.target_kafka.security_protocol = security_protocol
+        self.target_kafka.interbroker_security_protocol = security_protocol
+        if self.source_kafka.security_config.has_sasl_kerberos:
+            minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes)
+            self.source_kafka.minikdc = minikdc
+            self.target_kafka.minikdc = minikdc
+            minikdc.start()
+        self.source_kafka.start()
         self.target_kafka.start()
 
     def bounce(self, clean_shutdown=True):
@@ -98,9 +109,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
         wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
                      err_msg="Producer failed to produce %d messages in a reasonable amount
of time." % n_messages)
 
-    @parametrize(new_consumer=True)
-    @parametrize(new_consumer=False)
-    def test_simple_end_to_end(self, new_consumer):
+    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
+    def test_simple_end_to_end(self, security_protocol, new_consumer):
         """
         Test end-to-end behavior under non-failure conditions.
 
@@ -112,6 +123,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
         - Consume messages from target.
         - Verify that number of consumed messages matches the number produced.
         """
+        self.start_kafka(security_protocol)
+        self.consumer.new_consumer = new_consumer
+
         self.mirror_maker.new_consumer = new_consumer
         self.mirror_maker.start()
 
@@ -126,8 +140,8 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
         self.mirror_maker.stop()
 
     @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True,
False])
-    @matrix(new_consumer=[True], clean_shutdown=[True, False])
-    def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True):
+    @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT',
'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True,
security_protocol='PLAINTEXT'):
         """
         Test end-to-end behavior under failure conditions.
 
@@ -145,6 +159,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
             # the group until the previous session times out
             self.consumer.consumer_timeout_ms = 60000
 
+        self.start_kafka(security_protocol)
+        self.consumer.new_consumer = new_consumer
+
         self.mirror_maker.offsets_storage = offsets_storage
         self.mirror_maker.new_consumer = new_consumer
         self.mirror_maker.start()


Mime
View raw message