kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2644; Run relevant ducktape tests with SASL_PLAINTEXT and SASL_SSL
Date Wed, 04 Nov 2015 05:25:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 596c203af -> 98db5ea94


KAFKA-2644; Run relevant ducktape tests with SASL_PLAINTEXT and SASL_SSL

Run sanity check, replication tests and benchmarks with SASL/Kerberos using MiniKdc.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Geoff Anderson <geoff@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #358 from rajinisivaram/KAFKA-2644


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98db5ea9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98db5ea9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98db5ea9

Branch: refs/heads/trunk
Commit: 98db5ea94fcf7600137b5072453705c2a62e1f54
Parents: 596c203
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Tue Nov 3 21:25:15 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Nov 3 21:25:15 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |   9 +-
 .../sanity_checks/test_console_consumer.py      |   8 +-
 tests/kafkatest/services/console_consumer.py    |  16 +-
 tests/kafkatest/services/kafka/kafka.py         |  27 +--
 .../services/kafka/templates/kafka.properties   |  14 +-
 .../kafkatest/services/kafka_log4j_appender.py  |   2 +-
 .../performance/consumer_performance.py         |   8 +-
 .../services/performance/end_to_end_latency.py  |  25 +--
 .../performance/producer_performance.py         |  12 +-
 tests/kafkatest/services/security/__init__.py   |  15 ++
 tests/kafkatest/services/security/minikdc.py    |  79 ++++++++
 .../services/security/security_config.py        | 187 +++++++++++++++++++
 .../security/templates/gssapi_jaas.conf         |  51 +++++
 .../security/templates/minikdc.properties       |  17 ++
 tests/kafkatest/services/verifiable_producer.py |   8 +-
 tests/kafkatest/tests/benchmark_test.py         |  56 +++---
 tests/kafkatest/tests/log4j_appender_test.py    |   6 +-
 tests/kafkatest/tests/quota_test.py             |  10 +-
 tests/kafkatest/tests/replication_test.py       |  12 +-
 tests/kafkatest/utils/security_config.py        | 133 -------------
 20 files changed, 462 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 961aa35..c82ece9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -323,7 +323,7 @@ project(':core') {
   }
 
   jar {
-    dependsOn 'copyDependantLibs'
+    dependsOn('copyDependantLibs', 'copyDependantTestLibs')
   }
 
   jar.manifest {
@@ -347,6 +347,13 @@ project(':core') {
   artifacts {
     archives testJar
   }
+
+  tasks.create(name: "copyDependantTestLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('*.jar')
+    }
+    into "$buildDir/dependant-testlibs"
+  }
 }
 
 project(':contrib:hadoop-consumer') {

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 0d2c1fd..139c74a 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -24,7 +24,7 @@ from kafkatest.services.kafka.version import LATEST_0_8_2
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils.remote_account import line_count, file_exists
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
 
 
 import time
@@ -44,9 +44,9 @@ class ConsoleConsumerTest(Test):
     def setUp(self):
         self.zk.start()
 
-    @parametrize(security_protocol=SecurityConfig.SSL, new_consumer=True)
-    @matrix(security_protocol=[SecurityConfig.PLAINTEXT], new_consumer=[False, True])
-    def test_lifecycle(self, security_protocol, new_consumer):
+    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    def test_lifecycle(self, security_protocol, new_consumer=True):
         """Check that console consumer starts/stops properly, and that we are capturing log output."""
 
         self.kafka.security_protocol = security_protocol

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 07343e8..18021d9 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -19,7 +19,7 @@ from ducktape.services.background_thread import BackgroundThreadService
 from kafkatest.services.kafka.directory import kafka_dir
 from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
 
 import itertools
 import os
@@ -99,7 +99,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, new_consumer=False, message_validator=None,
+    def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None,
                  from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
         """
         Args:
@@ -107,7 +107,6 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
             num_nodes:                  number of nodes to use (this should be 1)
             kafka:                      kafka service
             topic:                      consume from this topic
-            security_protocol:          security protocol for Kafka connections
             new_consumer:               use new Kafka consumer if True
             message_validator:          function which returns message or None
             from_beginning:             consume from beginning if True, else from the end
@@ -132,13 +131,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         self.message_validator = message_validator
         self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
         self.client_id = client_id
-        self.security_protocol = security_protocol
 
-        # Validate a few configs
-        if self.new_consumer is None:
-            self.new_consumer = self.security_protocol == SecurityConfig.SSL
-        if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
-            raise Exception("SSL protocol is supported only with the new consumer")
 
     def prop_file(self, node):
         """Return a string which can be used to create a configuration file appropriate for the given node."""
@@ -151,8 +144,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
 
         # Add security properties to the config. If security protocol is not specified,
         # use the default in the template properties.
-        self.security_config = SecurityConfig(self.security_protocol, prop_file)
-        self.security_protocol = self.security_config.security_protocol
+        self.security_config = self.kafka.security_config.client_config(prop_file)
 
         prop_file += str(self.security_config)
         return prop_file
@@ -170,10 +162,12 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         args['jmx_port'] = self.jmx_port
         args['kafka_dir'] = kafka_dir(node)
         args['broker_list'] = self.kafka.bootstrap_servers()
+        args['kafka_opts'] = self.security_config.kafka_opts
 
         cmd = "export JMX_PORT=%(jmx_port)s; " \
               "export LOG_DIR=%(log_dir)s; " \
               "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
+              "export KAFKA_OPTS=%(kafka_opts)s; " \
               "/opt/%(kafka_dir)s/bin/kafka-console-consumer.sh " \
               "--topic %(topic)s --consumer.config %(config_file)s" % args
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 5e4a1e1..6778b5a 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -22,7 +22,8 @@ from kafkatest.services.kafka.version import TRUNK
 from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
 
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.services.security.minikdc import MiniKdc
 import json
 import re
 import signal
@@ -45,7 +46,7 @@ class KafkaService(JmxMixin, Service):
     }
 
     def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
-                 topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
+                 sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
         """
         :type context
         :type zk: ZookeeperService
@@ -59,6 +60,7 @@ class KafkaService(JmxMixin, Service):
 
         self.security_protocol = security_protocol
         self.interbroker_security_protocol = interbroker_security_protocol
+        self.sasl_mechanism = sasl_mechanism
         self.topics = topics
 
         for node in self.nodes:
@@ -67,16 +69,15 @@ class KafkaService(JmxMixin, Service):
 
     @property
     def security_config(self):
-        if self.security_protocol == SecurityConfig.SSL or self.interbroker_security_protocol == SecurityConfig.SSL:
-            return SecurityConfig(SecurityConfig.SSL)
-        else:
-            return SecurityConfig(SecurityConfig.PLAINTEXT)
+        return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, sasl_mechanism=self.sasl_mechanism)
 
-    @property
-    def port(self):
-        return 9092 if self.security_protocol == SecurityConfig.PLAINTEXT else 9093
 
     def start(self):
+        if self.security_config.has_sasl_kerberos:
+            self.minikdc = MiniKdc(self.context, self.nodes)
+            self.minikdc.start()
+        else:
+            self.minikdc = None
         Service.start(self)
 
         # Create topics if necessary
@@ -96,12 +97,15 @@ class KafkaService(JmxMixin, Service):
         # TODO - clean up duplicate configuration logic
         prop_file = cfg.render()
         prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
-                                  security_config=self.security_config, port=self.port)
+                                  security_config=self.security_config, 
+                                  interbroker_security_protocol=self.interbroker_security_protocol,
+                                  sasl_mechanism=self.sasl_mechanism)
         return prop_file
 
     def start_cmd(self, node):
         cmd = "export JMX_PORT=%d; " % self.jmx_port
         cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; "
+        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
         cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
         return cmd
 
@@ -296,8 +300,7 @@ class KafkaService(JmxMixin, Service):
 
     def bootstrap_servers(self):
         """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
-        using the port for the configured security protocol.
 
         This is the format expected by many config files.
         """
-        return ','.join([node.account.hostname + ":" + str(self.port) for node in self.nodes])
+        return ','.join([node.account.hostname + ":9092" for node in self.nodes])

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 4db1120..e938ac8 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -18,11 +18,11 @@
 advertised.host.name={{ node.account.hostname }}
 
 {% if security_protocol == interbroker_security_protocol %}
-listeners={{ security_protocol }}://:{{ port }}
-advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }}
+listeners={{ security_protocol }}://:9092
+advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092
 {% else %}
-listeners=PLAINTEXT://:9092,SSL://:9093
-advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093
+listeners={{ security_protocol }}://:9092,{{ interbroker_security_protocol }}://:9093
+advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092,{{ interbroker_security_protocol }}://{{ node.account.hostname }}:9093
 {% endif %}
 
 num.network.threads=3
@@ -56,10 +56,12 @@ quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_p
 
 security.inter.broker.protocol={{ interbroker_security_protocol }}
 
-ssl.keystore.location=/mnt/ssl/test.keystore.jks
+ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
 ssl.key.password=test-key-passwd
 ssl.keystore.type=JKS
-ssl.truststore.location=/mnt/ssl/test.truststore.jks
+ssl.truststore.location=/mnt/security/test.truststore.jks
 ssl.truststore.password=test-ts-passwd
 ssl.truststore.type=JKS
+sasl.mechanism={{ sasl_mechanism }}
+sasl.kerberos.service.name=kafka

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index 39e079f..af65eea 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -16,7 +16,7 @@
 from ducktape.services.background_thread import BackgroundThreadService
 
 from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
 
 
 class KafkaLog4jAppender(BackgroundThreadService):

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index e52220c..4d24628 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -15,7 +15,7 @@
 
 from kafkatest.services.performance import PerformanceService
 from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
 
 import os
 
@@ -69,11 +69,10 @@ class ConsumerPerformanceService(PerformanceService):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, security_protocol, topic, messages, new_consumer=False, settings={}):
+    def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
         super(ConsumerPerformanceService, self).__init__(context, num_nodes)
         self.kafka = kafka
-        self.security_config = SecurityConfig(security_protocol)
-        self.security_protocol = security_protocol
+        self.security_config = kafka.security_config.client_config()
         self.topic = topic
         self.messages = messages
         self.new_consumer = new_consumer
@@ -123,6 +122,7 @@ class ConsumerPerformanceService(PerformanceService):
 
     def start_cmd(self, node):
         cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
+        cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
         cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node)
         for key, value in self.args.items():

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 2be1621..e7147c8 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from kafkatest.services.performance import PerformanceService
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
 
 from kafkatest.services.kafka.directory import kafka_dir
 
@@ -27,34 +27,35 @@ class EndToEndLatencyService(PerformanceService):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, consumer_fetch_max_wait=100, acks=1):
+    def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
         super(EndToEndLatencyService, self).__init__(context, num_nodes)
         self.kafka = kafka
-        self.security_config = SecurityConfig(security_protocol)
-        self.security_protocol = security_protocol
+        self.security_config = kafka.security_config.client_config()
         self.args = {
             'topic': topic,
             'num_records': num_records,
             'consumer_fetch_max_wait': consumer_fetch_max_wait,
-            'acks': acks
+            'acks': acks,
+            'kafka_opts': self.security_config.kafka_opts
         }
 
     def _worker(self, idx, node):
         args = self.args.copy()
         self.security_config.setup_node(node)
-        if self.security_protocol == SecurityConfig.SSL:
-            ssl_config_file = SecurityConfig.SSL_DIR + "/security.properties"
-            node.account.create_file(ssl_config_file, str(self.security_config))
+        if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
+            security_config_file = SecurityConfig.CONFIG_DIR + "/security.properties"
+            node.account.create_file(security_config_file, str(self.security_config))
         else:
-            ssl_config_file = ""
+            security_config_file = ""
         args.update({
             'zk_connect': self.kafka.zk.connect_setting(),
             'bootstrap_servers': self.kafka.bootstrap_servers(),
-            'ssl_config_file': ssl_config_file
+            'security_config_file': security_config_file,
+            'kafka_dir': kafka_dir(node)
         })
 
-        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % kafka_dir(node)
-        cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(ssl_config_file)s" % args
+        cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
+        cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(security_config_file)s" % args
         cmd += " | tee /mnt/end-to-end-latency.log"
 
         self.logger.debug("End-to-end latency %d command: %s", idx, cmd)

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index a3b1d0e..b94aab6 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -16,7 +16,7 @@
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.performance import PerformanceService
 import itertools
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.kafka.directory import kafka_dir
 
 class ProducerPerformanceService(JmxMixin, PerformanceService):
@@ -27,15 +27,15 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={},
+    def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={},
                  intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]):
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
         PerformanceService.__init__(self, context, num_nodes)
         self.kafka = kafka
-        self.security_config = SecurityConfig(security_protocol)
-        self.security_protocol = security_protocol
+        self.security_config = kafka.security_config.client_config()
         self.args = {
             'topic': topic,
+            'kafka_opts': self.security_config.kafka_opts,
             'num_records': num_records,
             'record_size': record_size,
             'throughput': throughput
@@ -52,11 +52,11 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             'client_id': self.client_id,
             'kafka_directory': kafka_dir(node)
             })
-        cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
+        cmd = "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
               "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
 
         self.security_config.setup_node(node)
-        if self.security_protocol == SecurityConfig.SSL:
+        if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
             self.settings.update(self.security_config.properties)
         for key, value in self.settings.items():
             cmd += " %s=%s" % (str(key), str(value))

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/security/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/__init__.py b/tests/kafkatest/services/security/__init__.py
new file mode 100644
index 0000000..e556dc9
--- /dev/null
+++ b/tests/kafkatest/services/security/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/security/minikdc.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py
new file mode 100644
index 0000000..e1b4b95
--- /dev/null
+++ b/tests/kafkatest/services/security/minikdc.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from kafkatest.services.kafka.directory import kafka_dir
+
+import os
+
+
+class MiniKdc(Service):
+
+    logs = {
+        "minikdc_log": {
+            "path": "/mnt/minikdc/minikdc.log",
+            "collect_default": True}
+    }
+
+    WORK_DIR = "/mnt/minikdc"
+    PROPS_FILE = "/mnt/minikdc/minikdc.properties"
+    KEYTAB_FILE = "/mnt/minikdc/keytab"
+    KRB5CONF_FILE = "/mnt/minikdc/krb5.conf"
+    LOG_FILE = "/mnt/minikdc/minikdc.log"
+    LOCAL_KEYTAB_FILE = "/tmp/keytab"
+    LOCAL_KRB5CONF_FILE = "/tmp/krb5.conf"
+
+    def __init__(self, context, kafka_nodes):
+        super(MiniKdc, self).__init__(context, 1)
+        self.kafka_nodes = kafka_nodes
+
+
+    def start_node(self, node):
+
+        node.account.ssh("mkdir -p %s" % MiniKdc.WORK_DIR, allow_fail=False)
+        props_file = self.render('minikdc.properties',  node=node)
+        node.account.create_file(MiniKdc.PROPS_FILE, props_file)
+        self.logger.info("minikdc.properties")
+        self.logger.info(props_file)
+
+        kafka_principals = ' '.join(['kafka/' + kafka_node.account.hostname for kafka_node in self.kafka_nodes])
+        principals = 'client ' + kafka_principals
+        self.logger.info("Starting MiniKdc with principals " + principals)
+
+        lib_dir = "/opt/%s/core/build/dependant-testlibs" % kafka_dir(node)
+        kdc_jars = node.account.ssh_capture("ls " + lib_dir)
+        classpath = ":".join([os.path.join(lib_dir, jar.strip()) for jar in kdc_jars])
+        cmd = "CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh org.apache.hadoop.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE)
+        self.logger.debug("Attempting to start MiniKdc on %s with command: %s" % (str(node.account), cmd))
+        with node.account.monitor_log(MiniKdc.LOG_FILE) as monitor:
+            node.account.ssh(cmd)
+            monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup")
+        node.account.scp_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
+        node.account.scp_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
+
+
+    def stop_node(self, node):
+        self.logger.info("Stopping %s on %s" % (type(self).__name__, node.account.hostname))
+        node.account.kill_process("apacheds", allow_fail=False)
+
+    def clean_node(self, node):
+        node.account.kill_process("apacheds", clean_shutdown=False, allow_fail=False)
+        node.account.ssh("rm -rf " + MiniKdc.WORK_DIR, allow_fail=False)
+        if os.path.exists(MiniKdc.LOCAL_KEYTAB_FILE):
+            os.remove(MiniKdc.LOCAL_KEYTAB_FILE)
+        if os.path.exists(MiniKdc.LOCAL_KRB5CONF_FILE):
+            os.remove(MiniKdc.LOCAL_KRB5CONF_FILE)
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
new file mode 100644
index 0000000..912801d
--- /dev/null
+++ b/tests/kafkatest/services/security/security_config.py
@@ -0,0 +1,187 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+from ducktape.template import TemplateRenderer
+from kafkatest.services.security.minikdc import MiniKdc
+
+class Keytool(object):
+
+    @staticmethod
+    def generate_keystore_truststore(ssl_dir='.'):
+        """
+        Generate JKS keystore and truststore and return
+        Kafka SSL properties with these stores.
+        """
+        ks_path = os.path.join(ssl_dir, 'test.keystore.jks')
+        ks_password = 'test-ks-passwd'
+        key_password = 'test-key-passwd'
+        ts_path = os.path.join(ssl_dir, 'test.truststore.jks')
+        ts_password = 'test-ts-passwd'
+        if os.path.exists(ks_path):
+            os.remove(ks_path)
+        if os.path.exists(ts_path):
+            os.remove(ts_path)
+        
+        Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password))
+        Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password))
+        Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password))
+        os.remove('test.crt')
+
+        return {
+            'ssl.keystore.location' : ks_path,
+            'ssl.keystore.password' : ks_password,
+            'ssl.key.password' : key_password,
+            'ssl.truststore.location' : ts_path,
+            'ssl.truststore.password' : ts_password
+        }
+
+    @staticmethod
+    def runcmd(cmd):
+        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+        proc.communicate()
+        if proc.returncode != 0:
+            raise subprocess.CalledProcessError(proc.returncode, cmd)
+
+
+class SecurityConfig(TemplateRenderer):
+
+    PLAINTEXT = 'PLAINTEXT'
+    SSL = 'SSL'
+    SASL_PLAINTEXT = 'SASL_PLAINTEXT'
+    SASL_SSL = 'SASL_SSL'
+    SASL_MECHANISM_GSSAPI = 'GSSAPI'
+    SASL_MECHANISM_PLAIN = 'PLAIN'
+    CONFIG_DIR = "/mnt/security"
+    KEYSTORE_PATH = "/mnt/security/test.keystore.jks"
+    TRUSTSTORE_PATH = "/mnt/security/test.truststore.jks"
+    JAAS_CONF_PATH = "/mnt/security/jaas.conf"
+    KRB5CONF_PATH = "/mnt/security/krb5.conf"
+    KEYTAB_PATH = "/mnt/security/keytab"
+
+    ssl_stores = Keytool.generate_keystore_truststore('.')
+
+    def __init__(self, security_protocol, interbroker_security_protocol=None, sasl_mechanism=SASL_MECHANISM_GSSAPI, template_props=""):
+        """
+        Initialize the security properties for the node and copy
+        keystore and truststore to the remote node if the transport protocol 
+        is SSL. If security_protocol is None, the protocol specified in the
+        template properties file is used. If no protocol is specified in the
+        template properties either, PLAINTEXT is used as default.
+        """
+
+        if security_protocol is None:
+            security_protocol = self.get_property('security.protocol', template_props)
+        if security_protocol is None:
+            security_protocol = SecurityConfig.PLAINTEXT
+        elif security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL]:
+            raise Exception("Invalid security.protocol in template properties: " + security_protocol)
+
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
+        self.interbroker_security_protocol = interbroker_security_protocol
+        self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol)
+        self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol)
+        self.properties = {
+            'security.protocol' : security_protocol,
+            'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
+            'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'],
+            'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
+            'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
+            'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'],
+            'sasl.mechanism' : sasl_mechanism,
+            'sasl.kerberos.service.name' : 'kafka'
+        }
+
+
+    def client_config(self, template_props=""):
+        return SecurityConfig(self.security_protocol, sasl_mechanism=self.sasl_mechanism, template_props=template_props)
+
+    def setup_node(self, node):
+        if self.has_ssl:
+            node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
+            node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH)
+            node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH)
+
+        if self.has_sasl:
+            node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
+            jaas_conf_file = self.sasl_mechanism.lower() + "_jaas.conf"
+            java_version = node.account.ssh_capture("java -version")
+            if any('IBM' in line for line in java_version):
+                is_ibm_jdk = True
+            else:
+                is_ibm_jdk = False
+            jaas_conf = self.render(jaas_conf_file,  node=node, is_ibm_jdk=is_ibm_jdk)
+            node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
+            if self.has_sasl_kerberos:
+                node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
+                node.account.scp_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
+
+    def clean_node(self, node):
+        if self.security_protocol != SecurityConfig.PLAINTEXT:
+            node.account.ssh("rm -rf %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
+
+    def get_property(self, prop_name, template_props=""):
+        """
+        Get property value from the string representation of
+        a properties file.
+        """
+        value = None
+        for line in template_props.split("\n"):
+            items = line.split("=")
+            if len(items) == 2 and items[0].strip() == prop_name:
+                value = str(items[1].strip())
+        return value
+
+    def is_ssl(self, security_protocol):
+        return security_protocol == SecurityConfig.SSL or security_protocol == SecurityConfig.SASL_SSL
+
+    def is_sasl(self, security_protocol):
+        return security_protocol == SecurityConfig.SASL_PLAINTEXT or security_protocol == SecurityConfig.SASL_SSL
+
+    @property
+    def security_protocol(self):
+        return self.properties['security.protocol']
+
+    @property
+    def sasl_mechanism(self):
+        return self.properties['sasl.mechanism']
+
+    @property
+    def has_sasl_kerberos(self):
+        return self.has_sasl and self.sasl_mechanism == SecurityConfig.SASL_MECHANISM_GSSAPI
+
+    @property
+    def kafka_opts(self):
+        if self.has_sasl:
+            return "\"-Djava.security.auth.login.config=%s -Djava.security.krb5.conf=%s\"" % (SecurityConfig.JAAS_CONF_PATH, SecurityConfig.KRB5CONF_PATH)
+        else:
+            return ""
+
+    def __str__(self):
+        """
+        Return properties as string with line separators.
+        This is used to append security config properties to
+        a properties file.
+        """
+
+        prop_str = ""
+        if self.security_protocol != SecurityConfig.PLAINTEXT:
+            for key, value in self.properties.items():
+                prop_str += ("\n" + key + "=" + value)
+            prop_str += "\n"
+        return prop_str
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/security/templates/gssapi_jaas.conf
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/templates/gssapi_jaas.conf b/tests/kafkatest/services/security/templates/gssapi_jaas.conf
new file mode 100644
index 0000000..9582056
--- /dev/null
+++ b/tests/kafkatest/services/security/templates/gssapi_jaas.conf
@@ -0,0 +1,51 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+
+{% if is_ibm_jdk %}
+
+KafkaClient {
+    com.ibm.security.auth.module.Krb5LoginModule required debug=false
+    credsType=both
+    useKeytab="file:/mnt/security/keytab"
+    principal="client@EXAMPLE.COM";
+};
+
+KafkaServer {
+    com.ibm.security.auth.module.Krb5LoginModule required debug=false
+    credsType=both
+    useKeytab="file:/mnt/security/keytab"
+    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
+};
+
+{% else %}
+
+KafkaClient {
+    com.sun.security.auth.module.Krb5LoginModule required debug=false
+    doNotPrompt=true
+    useKeyTab=true
+    storeKey=true
+    keyTab="/mnt/security/keytab"
+    principal="client@EXAMPLE.COM";
+};
+
+KafkaServer {
+    com.sun.security.auth.module.Krb5LoginModule required debug=false
+    doNotPrompt=true
+    useKeyTab=true
+    storeKey=true
+    keyTab="/mnt/security/keytab"
+    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
+};
+
+{% endif %}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/security/templates/minikdc.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/templates/minikdc.properties b/tests/kafkatest/services/security/templates/minikdc.properties
new file mode 100644
index 0000000..fce8f83
--- /dev/null
+++ b/tests/kafkatest/services/security/templates/minikdc.properties
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kdc.bind.address={{ node.account.hostname }}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index a95a0d6..33c1e24 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -17,7 +17,7 @@ from ducktape.services.background_thread import BackgroundThreadService
 
 from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
 from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
 
 import json
 import os
@@ -46,7 +46,7 @@ class VerifiableProducer(BackgroundThreadService):
             "collect_default": True}
         }
 
-    def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, max_messages=-1, throughput=100000, version=TRUNK):
+    def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, version=TRUNK):
         super(VerifiableProducer, self).__init__(context, num_nodes)
         self.log_level = "TRACE"
 
@@ -61,8 +61,7 @@ class VerifiableProducer(BackgroundThreadService):
         self.not_acked_values = []
 
         self.prop_file = ""
-        self.security_config = SecurityConfig(security_protocol, self.prop_file)
-        self.security_protocol = self.security_config.security_protocol
+        self.security_config = kafka.security_config.client_config(self.prop_file)
         self.prop_file += str(self.security_config)
 
     def _worker(self, idx, node):
@@ -120,6 +119,7 @@ class VerifiableProducer(BackgroundThreadService):
             cmd += "export CLASSPATH; "
 
         cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
+        cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
         cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \
               " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/tests/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py
index 7219c0a..02f4e4d 100644
--- a/tests/kafkatest/tests/benchmark_test.py
+++ b/tests/kafkatest/tests/benchmark_test.py
@@ -80,7 +80,7 @@ class Benchmark(Test):
         nrecords = int(self.target_data_size / message_size)
 
         self.producer = ProducerPerformanceService(
-            self.test_context, num_producers, self.kafka, security_protocol=security_protocol, topic=topic,
+            self.test_context, num_producers, self.kafka, topic=topic,
             num_records=nrecords, record_size=message_size,  throughput=-1,
             settings={
                 'acks': acks,
@@ -89,9 +89,9 @@ class Benchmark(Test):
         self.producer.run()
         return compute_aggregate_throughput(self.producer)
 
-    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol):
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
@@ -100,9 +100,11 @@ class Benchmark(Test):
 
         (This runs ProducerPerformance.java under the hood)
         """
-        self.start_kafka(security_protocol, security_protocol)
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
+        self.start_kafka(security_protocol, interbroker_security_protocol)
         self.producer = ProducerPerformanceService(
-            self.test_context, 1, self.kafka, security_protocol=security_protocol,
+            self.test_context, 1, self.kafka,
             topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
             throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
             intermediate_stats=True
@@ -132,10 +134,10 @@ class Benchmark(Test):
         self.logger.info("\n".join(summary))
         return data
 
-    
-    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol):
+
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
@@ -145,19 +147,21 @@ class Benchmark(Test):
 
         (Under the hood, this simply runs EndToEndLatency.scala)
         """
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
         self.start_kafka(security_protocol, interbroker_security_protocol)
         self.logger.info("BENCHMARK: End to end latency")
         self.perf = EndToEndLatencyService(
             self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, security_protocol=security_protocol, num_records=10000
+            topic=TOPIC_REP_THREE, num_records=10000
         )
         self.perf.run()
         return latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
 
-    @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL')
-    @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT'])
-    def test_producer_and_consumer(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT'):
+    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
@@ -167,17 +171,19 @@ class Benchmark(Test):
 
         (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
         """
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
         self.start_kafka(security_protocol, interbroker_security_protocol)
         num_records = 10 * 1000 * 1000  # 10e6
 
         self.producer = ProducerPerformanceService(
-            self.test_context, 1, self.kafka, 
-            topic=TOPIC_REP_THREE, security_protocol=security_protocol,
+            self.test_context, 1, self.kafka,
+            topic=TOPIC_REP_THREE,
             num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
             settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
         )
         self.consumer = ConsumerPerformanceService(
-            self.test_context, 1, self.kafka, security_protocol, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+            self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
         Service.run_parallel(self.producer, self.consumer)
 
         data = {
@@ -190,21 +196,23 @@ class Benchmark(Test):
         self.logger.info("\n".join(summary))
         return data
 
-    @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL')
-    @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT'])
-    def test_consumer_throughput(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT', num_consumers=1):
+    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
         """
         Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
         (using new consumer iff new_consumer == True), and report throughput.
         """
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
         self.start_kafka(security_protocol, interbroker_security_protocol)
         num_records = 10 * 1000 * 1000  # 10e6
 
         # seed kafka w/messages
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, security_protocol=security_protocol,
+            topic=TOPIC_REP_THREE,
             num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
             settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
         )
@@ -213,7 +221,7 @@ class Benchmark(Test):
         # consume
         self.consumer = ConsumerPerformanceService(
             self.test_context, num_consumers, self.kafka,
-            topic=TOPIC_REP_THREE, security_protocol=security_protocol, new_consumer=new_consumer, messages=num_records)
+            topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
         self.consumer.group = "test-consumer-group"
         self.consumer.run()
         return compute_aggregate_throughput(self.consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/tests/log4j_appender_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/log4j_appender_test.py b/tests/kafkatest/tests/log4j_appender_test.py
index 315ee34..db33d76 100644
--- a/tests/kafkatest/tests/log4j_appender_test.py
+++ b/tests/kafkatest/tests/log4j_appender_test.py
@@ -22,7 +22,7 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender
-from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SecurityConfig
 
 TOPIC = "topic-log4j-appender"
 MAX_MESSAGES = 100
@@ -59,7 +59,7 @@ class Log4jAppenderTest(Test):
     def start_consumer(self, security_protocol):
         enable_new_consumer = security_protocol == SecurityConfig.SSL
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
-                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer, security_protocol=security_protocol)
+                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
         self.consumer.start()
 
     @matrix(security_protocol=['PLAINTEXT', 'SSL'])
@@ -83,4 +83,4 @@ class Log4jAppenderTest(Test):
         wait_until(lambda: len(self.consumer.messages_consumed[1]) == expected_lines_count, timeout_sec=10,
                    err_msg="Timed out waiting to consume expected number of messages.")
 
-        self.consumer.stop()
\ No newline at end of file
+        self.consumer.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/tests/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py
index 6ba6aa7..2649d7d 100644
--- a/tests/kafkatest/tests/quota_test.py
+++ b/tests/kafkatest/tests/quota_test.py
@@ -45,13 +45,11 @@ class QuotaTest(Test):
         self.maximum_broker_deviation_percentage = 5.0
         self.num_records = 100000
         self.record_size = 3000
-        self.security_protocol = 'PLAINTEXT'
-        self.interbroker_security_protocol = 'PLAINTEXT'
 
         self.zk = ZookeeperService(test_context, num_nodes=1)
         self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol=self.security_protocol,
-                                  interbroker_security_protocol=self.interbroker_security_protocol,
+                                  security_protocol='PLAINTEXT',
+                                  interbroker_security_protocol='PLAINTEXT',
                                   topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'min.insync.replicas': 1}},
                                   quota_config=self.quota_config,
                                   jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
@@ -74,7 +72,7 @@ class QuotaTest(Test):
     def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
         # Produce all messages
         producer = ProducerPerformanceService(
-            self.test_context, producer_num, self.kafka, security_protocol=self.security_protocol,
+            self.test_context, producer_num, self.kafka,
             topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_id,
             jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], jmx_attributes=['outgoing-byte-rate'])
 
@@ -82,7 +80,7 @@ class QuotaTest(Test):
 
         # Consume all messages
         consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
-            security_protocol=self.security_protocol, new_consumer=False,
+            new_consumer=False,
             consumer_timeout_ms=60000, client_id=consumer_id,
             jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s' % consumer_id],
             jmx_attributes=['OneMinuteRate'])

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index 16aa944..6633a4f 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -108,8 +108,8 @@ class ReplicationTest(ProduceConsumeValidateTest):
 
 
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            interbroker_security_protocol=["PLAINTEXT", "SSL"])
-    def test_replication_with_broker_failure(self, failure_mode, interbroker_security_protocol="PLAINTEXT"):
+            security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+    def test_replication_with_broker_failure(self, failure_mode, security_protocol):
         """Replication tests.
         These tests verify that replication provides simple durability guarantees by checking that data acked by
         brokers is still available for consumption in the face of various failure scenarios.
@@ -122,11 +122,11 @@ class ReplicationTest(ProduceConsumeValidateTest):
             - When done driving failures, stop producing, and finish consuming
             - Validate that every acked message was consumed
         """
-        client_security_protocol = 'PLAINTEXT'
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=client_security_protocol, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=client_security_protocol, consumer_timeout_ms=60000, message_validator=is_int)
 
-        self.kafka.interbroker_security_protocol = interbroker_security_protocol
+        self.kafka.security_protocol = 'PLAINTEXT'
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=60000, message_validator=is_int)
         self.kafka.start()
         
         self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))

http://git-wip-us.apache.org/repos/asf/kafka/blob/98db5ea9/tests/kafkatest/utils/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/security_config.py b/tests/kafkatest/utils/security_config.py
deleted file mode 100644
index 965f209..0000000
--- a/tests/kafkatest/utils/security_config.py
+++ /dev/null
@@ -1,133 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import subprocess
-
-
-class Keytool(object):
-
-    @staticmethod
-    def generate_keystore_truststore(ssl_dir='.'):
-        """
-        Generate JKS keystore and truststore and return
-        Kafka SSL properties with these stores.
-        """
-        ks_path = os.path.join(ssl_dir, 'test.keystore.jks')
-        ks_password = 'test-ks-passwd'
-        key_password = 'test-key-passwd'
-        ts_path = os.path.join(ssl_dir, 'test.truststore.jks')
-        ts_password = 'test-ts-passwd'
-        if os.path.exists(ks_path):
-            os.remove(ks_path)
-        if os.path.exists(ts_path):
-            os.remove(ts_path)
-        
-        Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password))
-        Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password))
-        Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password))
-        os.remove('test.crt')
-
-        return {
-            'ssl.keystore.location' : ks_path,
-            'ssl.keystore.password' : ks_password,
-            'ssl.key.password' : key_password,
-            'ssl.truststore.location' : ts_path,
-            'ssl.truststore.password' : ts_password
-        }
-
-    @staticmethod
-    def runcmd(cmd):
-        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-        proc.communicate()
-        if proc.returncode != 0:
-            raise subprocess.CalledProcessError(proc.returncode, cmd)
-
-
-class SecurityConfig(object):
-
-    PLAINTEXT = 'PLAINTEXT'
-    SSL = 'SSL'
-    SSL_DIR = "/mnt/ssl"
-    KEYSTORE_PATH = "/mnt/ssl/test.keystore.jks"
-    TRUSTSTORE_PATH = "/mnt/ssl/test.truststore.jks"
-
-    ssl_stores = Keytool.generate_keystore_truststore('.')
-
-    def __init__(self, security_protocol, template_props=""):
-        """
-        Initialize the security properties for the node and copy
-        keystore and truststore to the remote node if the transport protocol 
-        is SSL. If security_protocol is None, the protocol specified in the
-        template properties file is used. If no protocol is specified in the
-        template properties either, PLAINTEXT is used as default.
-        """
-
-        if security_protocol is None:
-            security_protocol = self.get_property('security.protocol', template_props)
-        if security_protocol is None:
-            security_protocol = SecurityConfig.PLAINTEXT
-        elif security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL]:
-            raise Exception("Invalid security.protocol in template properties: " + security_protocol)
-
-        self.properties = {
-            'security.protocol' : security_protocol,
-            'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
-            'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'],
-            'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
-            'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
-            'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password']
-        }
-    
-    def setup_node(self, node):
-        if self.security_protocol == SecurityConfig.SSL:
-            node.account.ssh("mkdir -p %s" % SecurityConfig.SSL_DIR, allow_fail=False)
-            node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH)
-            node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH)
-
-    def clean_node(self, node):
-        if self.security_protocol == SecurityConfig.SSL:
-            node.account.ssh("rm -rf %s" % SecurityConfig.SSL_DIR, allow_fail=False)
-
-    def get_property(self, prop_name, template_props=""):
-        """
-        Get property value from the string representation of
-        a properties file.
-        """
-        value = None
-        for line in template_props.split("\n"):
-            items = line.split("=")
-            if len(items) == 2 and items[0].strip() == prop_name:
-                value = str(items[1].strip())
-        return value
-
-    @property
-    def security_protocol(self):
-        return self.properties['security.protocol']
-
-    def __str__(self):
-        """
-        Return properties as string with line separators.
-        This is used to append security config properties to
-        a properties file.
-        """
-
-        prop_str = ""
-        if self.security_protocol == SecurityConfig.SSL:
-            for key, value in self.properties.items():
-                prop_str += ("\n" + key + "=" + value)
-            prop_str += "\n"
-        return prop_str
-


Mime
View raw message