kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2581: Run some existing ducktape tests with SSL
Date Tue, 13 Oct 2015 00:15:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7915396ee -> dd514b2bb


KAFKA-2581: Run some existing ducktape tests with SSL

Parametrize console consumer sanity test, replication tests and benchmarks tests to run with both PLAINTEXT and SSL.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Geoff Anderson, Ewen Cheslack-Postava, Guozhang Wang

Closes #271 from rajinisivaram/KAFKA-2581


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd514b2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd514b2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd514b2b

Branch: refs/heads/trunk
Commit: dd514b2bb8c476642952bf923de8d4c81bbeca7d
Parents: 7915396
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Mon Oct 12 17:19:45 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Oct 12 17:19:45 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsumerPerformance.scala |  10 +-
 .../sanity_checks/test_console_consumer.py      |  14 +-
 tests/kafkatest/services/console_consumer.py    |  37 ++++--
 tests/kafkatest/services/kafka.py               |  20 ++-
 .../performance/consumer_performance.py         |  10 +-
 .../services/performance/end_to_end_latency.py  |  14 +-
 .../performance/producer_performance.py         |  13 +-
 .../services/templates/kafka.properties         |  17 +++
 tests/kafkatest/services/verifiable_producer.py |  19 ++-
 tests/kafkatest/tests/benchmark_test.py         |  83 ++++++++----
 tests/kafkatest/tests/replication_test.py       |  45 ++++---
 tests/kafkatest/utils/security_config.py        | 133 +++++++++++++++++++
 .../kafka/clients/tools/VerifiableProducer.java |  16 +++
 13 files changed, 363 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 8267030..c7f9072 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -27,6 +27,7 @@ import java.nio.channels.ClosedByInterruptException
 import org.apache.log4j.Logger
 import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.utils.Utils
 import kafka.utils.CommandLineUtils
 import java.util.{ Random, Properties }
 import kafka.consumer.Consumer
@@ -203,6 +204,10 @@ object ConsumerPerformance {
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1)
     val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.")
+    val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
+      .withRequiredArg
+      .describedAs("config file")
+      .ofType(classOf[String])
 
     val options = parser.parse(args: _*)
 
@@ -210,7 +215,10 @@ object ConsumerPerformance {
    
     val useNewConsumer = options.has(useNewConsumerOpt)
     
-    val props = new Properties
+    val props = if (options.has(consumerConfigOpt))
+      Utils.loadProps(options.valueOf(consumerConfigOpt))
+    else
+      new Properties
     if(useNewConsumer) {
       import org.apache.kafka.clients.consumer.ConsumerConfig
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 4544c00..a9c4d53 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -15,6 +15,8 @@
 
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize
+from ducktape.mark import matrix
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -30,16 +32,20 @@ class ConsoleConsumerTest(Test):
 
         self.topic = "topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
-        self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)
 
     def setUp(self):
         self.zk.start()
+
+    @parametrize(security_protocol='SSL', new_consumer=True)
+    @matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True])
+    def test_lifecycle(self, security_protocol, new_consumer):
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+                                  security_protocol=security_protocol,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
         self.kafka.start()
 
-    def test_lifecycle(self):
         t0 = time.time()
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer)
         self.consumer.start()
         node = self.consumer.nodes[0]
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 9286654..2f1e70e 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -15,6 +15,7 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 from ducktape.utils.util import wait_until
+from kafkatest.utils.security_config import SecurityConfig
 
 import os
 import subprocess
@@ -93,13 +94,15 @@ class ConsoleConsumer(BackgroundThreadService):
             "collect_default": True}
         }
 
-    def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None):
+    def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None, from_beginning=True, consumer_timeout_ms=None):
         """
         Args:
             context:                    standard context
             num_nodes:                  number of nodes to use (this should be 1)
             kafka:                      kafka service
             topic:                      consume from this topic
+            security_protocol:          security protocol for Kafka connections
+            new_consumer:               use new Kafka consumer if True
             message_validator:          function which returns message or None
             from_beginning:             consume from beginning if True, else from the end
             consumer_timeout_ms:        corresponds to consumer.timeout.ms. consumer process ends if time between
@@ -109,6 +112,7 @@ class ConsoleConsumer(BackgroundThreadService):
         """
         super(ConsoleConsumer, self).__init__(context, num_nodes)
         self.kafka = kafka
+        self.new_consumer = new_consumer
         self.args = {
             'topic': topic,
         }
@@ -119,6 +123,19 @@ class ConsoleConsumer(BackgroundThreadService):
         self.message_validator = message_validator
         self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
 
+        # Process client configuration
+        self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
+
+        # Add security properties to the config. If security protocol is not specified,
+        # use the default in the template properties.
+        self.security_config = SecurityConfig(security_protocol, self.prop_file)
+        self.security_protocol = self.security_config.security_protocol
+        if self.new_consumer is None:
+            self.new_consumer = self.security_protocol == SecurityConfig.SSL
+        if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
+            raise Exception("SSL protocol is supported only with the new consumer")
+        self.prop_file += str(self.security_config)
+
     @property
     def start_cmd(self):
         args = self.args.copy()
@@ -129,9 +146,13 @@ class ConsoleConsumer(BackgroundThreadService):
 
         cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
-        cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
+        cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \
             " --consumer.config %(config_file)s" % args
 
+        if self.new_consumer:
+            cmd += " --new-consumer --bootstrap-server %s"  % self.kafka.bootstrap_servers()
+        else:
+            cmd += " --zookeeper %(zk_connect)s" % args
         if self.from_beginning:
             cmd += " --from-beginning"
 
@@ -152,15 +173,10 @@ class ConsoleConsumer(BackgroundThreadService):
     def _worker(self, idx, node):
         node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
 
-        # Create and upload config file
-        if self.consumer_timeout_ms is not None:
-            prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
-        else:
-            prop_file = self.render('console_consumer.properties')
-
         self.logger.info("console_consumer.properties:")
-        self.logger.info(prop_file)
-        node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
+        self.logger.info(self.prop_file)
+        node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file)
+        self.security_config.setup_node(node)
 
         # Create and upload log properties
         log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
@@ -190,4 +206,5 @@ class ConsoleConsumer(BackgroundThreadService):
                              (self.__class__.__name__, node.account))
         node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
+        self.security_config.clean_node(node)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/services/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py
index 5ff8047..2c200f3 100644
--- a/tests/kafkatest/services/kafka.py
+++ b/tests/kafkatest/services/kafka.py
@@ -15,6 +15,7 @@
 
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
+from kafkatest.utils.security_config import SecurityConfig
 
 import json
 import re
@@ -33,7 +34,7 @@ class KafkaService(Service):
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, zk, topics=None):
+    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, topics=None):
         """
         :type context
         :type zk: ZookeeperService
@@ -41,6 +42,13 @@ class KafkaService(Service):
         """
         super(KafkaService, self).__init__(context, num_nodes)
         self.zk = zk
+        if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL:
+            self.security_config = SecurityConfig(SecurityConfig.SSL)
+        else:
+            self.security_config = SecurityConfig(SecurityConfig.PLAINTEXT)
+        self.security_protocol = security_protocol
+        self.interbroker_security_protocol = interbroker_security_protocol
+        self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093
         self.topics = topics
 
     def start(self):
@@ -56,10 +64,13 @@ class KafkaService(Service):
                 self.create_topic(topic_cfg)
 
     def start_node(self, node):
-        props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node))
+        props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node),
+            port = self.port, security_protocol = self.security_protocol, 
+            interbroker_security_protocol=self.interbroker_security_protocol)
         self.logger.info("kafka.properties:")
         self.logger.info(props_file)
         node.account.create_file("/mnt/kafka.properties", props_file)
+        self.security_config.setup_node(node)
 
         cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"
         self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
@@ -97,6 +108,7 @@ class KafkaService(Service):
     def clean_node(self, node):
         node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
+        self.security_config.clean_node(node)
 
     def create_topic(self, topic_cfg):
         node = self.nodes[0] # any node is fine here
@@ -227,4 +239,6 @@ class KafkaService(Service):
         return self.get_node(leader_idx)
 
     def bootstrap_servers(self):
-        return ','.join([node.account.hostname + ":9092" for node in self.nodes])
+        """Get the broker list to connect to Kafka using the specified security protocol
+        """
+        return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes])

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index ecaef43..b8eab22 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from kafkatest.services.performance import PerformanceService
+from kafkatest.utils.security_config import SecurityConfig
 
 import os
 
@@ -43,6 +44,7 @@ class ConsumerPerformanceService(PerformanceService):
         "num-fetch-threads", "Number of fetcher threads. Defaults to 1"
 
         "new-consumer", "Use the new consumer implementation."
+        "consumer.config", "Consumer config properties file."
     """
 
     # Root directory for persistent output
@@ -51,6 +53,7 @@ class ConsumerPerformanceService(PerformanceService):
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "consumer_performance.stdout")
     LOG_FILE = os.path.join(LOG_DIR, "consumer_performance.log")
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "consumer.properties")
 
     logs = {
         "consumer_performance_output": {
@@ -62,9 +65,11 @@ class ConsumerPerformanceService(PerformanceService):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
+    def __init__(self, context, num_nodes, kafka, security_protocol, topic, messages, new_consumer=False, settings={}):
         super(ConsumerPerformanceService, self).__init__(context, num_nodes)
         self.kafka = kafka
+        self.security_config = SecurityConfig(security_protocol)
+        self.security_protocol = security_protocol
         self.topic = topic
         self.messages = messages
         self.new_consumer = new_consumer
@@ -119,6 +124,7 @@ class ConsumerPerformanceService(PerformanceService):
         cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh"
         for key, value in self.args.items():
             cmd += " --%s %s" % (key, value)
+        cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
 
         for key, value in self.settings.items():
             cmd += " %s=%s" % (str(key), str(value))
@@ -131,6 +137,8 @@ class ConsumerPerformanceService(PerformanceService):
 
         log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE)
         node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config)
+        node.account.create_file(ConsumerPerformanceService.CONFIG_FILE, str(self.security_config))
+        self.security_config.setup_node(node)
 
         cmd = self.start_cmd
         self.logger.debug("Consumer performance %d command: %s", idx, cmd)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 4c61a93..0559a43 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from kafkatest.services.performance import PerformanceService
+from kafkatest.utils.security_config import SecurityConfig
 
 
 class EndToEndLatencyService(PerformanceService):
@@ -24,9 +25,11 @@ class EndToEndLatencyService(PerformanceService):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
+    def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, consumer_fetch_max_wait=100, acks=1):
         super(EndToEndLatencyService, self).__init__(context, num_nodes)
         self.kafka = kafka
+        self.security_config = SecurityConfig(security_protocol)
+        self.security_protocol = security_protocol
         self.args = {
             'topic': topic,
             'num_records': num_records,
@@ -36,14 +39,21 @@ class EndToEndLatencyService(PerformanceService):
 
     def _worker(self, idx, node):
         args = self.args.copy()
+        self.security_config.setup_node(node)
+        if self.security_protocol == SecurityConfig.SSL:
+            ssl_config_file = SecurityConfig.SSL_DIR + "/security.properties"
+            node.account.create_file(ssl_config_file, str(self.security_config))
+        else:
+            ssl_config_file = ""
         args.update({
             'zk_connect': self.kafka.zk.connect_setting(),
             'bootstrap_servers': self.kafka.bootstrap_servers(),
+            'ssl_config_file': ssl_config_file
         })
 
         cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
               "%(bootstrap_servers)s %(topic)s %(num_records)d "\
-              "%(acks)d 20" % args
+              "%(acks)d 20 %(ssl_config_file)s" % args
 
         cmd += " | tee /mnt/end-to-end-latency.log"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index c46a910..7a026fc 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from kafkatest.services.performance import PerformanceService
+from kafkatest.utils.security_config import SecurityConfig
 
 
 class ProducerPerformanceService(PerformanceService):
@@ -24,9 +25,11 @@ class ProducerPerformanceService(PerformanceService):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
+    def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
         super(ProducerPerformanceService, self).__init__(context, num_nodes)
         self.kafka = kafka
+        self.security_config = SecurityConfig(security_protocol)
+        self.security_protocol = security_protocol
         self.args = {
             'topic': topic,
             'num_records': num_records,
@@ -40,11 +43,15 @@ class ProducerPerformanceService(PerformanceService):
         args = self.args.copy()
         args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
         cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
-              "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"\
-              " | tee /mnt/producer-performance.log" % args
+              "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args
 
+        self.security_config.setup_node(node)
+        if self.security_protocol == SecurityConfig.SSL:
+            self.settings.update(self.security_config.properties)
         for key, value in self.settings.items():
             cmd += " %s=%s" % (str(key), str(value))
+        cmd += " | tee /mnt/producer-performance.log"
+
         self.logger.debug("Producer performance %d command: %s", idx, cmd)
 
         def parse_stats(line):

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/services/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties
index 6650d23..036a8db 100644
--- a/tests/kafkatest/services/templates/kafka.properties
+++ b/tests/kafkatest/services/templates/kafka.properties
@@ -20,6 +20,13 @@ port=9092
 #host.name=localhost
 advertised.host.name={{ node.account.hostname }}
 #advertised.port=<port accessible by clients>
+{% if security_protocol == interbroker_security_protocol %}
+listeners={{ security_protocol }}://:{{ port }}
+advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }}
+{% else %}
+listeners=PLAINTEXT://:9092,SSL://:9093
+advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093
+{% endif %}
 num.network.threads=3
 num.io.threads=8
 socket.send.buffer.bytes=102400
@@ -39,3 +46,13 @@ log.cleaner.enable=false
 
 zookeeper.connect={{ zk.connect_setting() }}
 zookeeper.connection.timeout.ms=2000
+
+security.inter.broker.protocol={{ interbroker_security_protocol }}
+ssl.keystore.location=/mnt/ssl/test.keystore.jks
+ssl.keystore.password=test-ks-passwd
+ssl.key.password=test-key-passwd
+ssl.keystore.type=JKS
+ssl.truststore.location=/mnt/ssl/test.truststore.jks
+ssl.truststore.password=test-ts-passwd
+ssl.truststore.type=JKS
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 158db7a..7ae7988 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -14,19 +14,21 @@
 # limitations under the License.
 
 from ducktape.services.background_thread import BackgroundThreadService
+from kafkatest.utils.security_config import SecurityConfig
 
 import json
 
 
 class VerifiableProducer(BackgroundThreadService):
 
+    CONFIG_FILE = "/mnt/verifiable_producer.properties"
     logs = {
         "producer_log": {
             "path": "/mnt/producer.log",
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000):
+    def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max_messages=-1, throughput=100000):
         super(VerifiableProducer, self).__init__(context, num_nodes)
 
         self.kafka = kafka
@@ -37,7 +39,18 @@ class VerifiableProducer(BackgroundThreadService):
         self.acked_values = []
         self.not_acked_values = []
 
+        self.prop_file = ""
+        self.security_config = SecurityConfig(security_protocol, self.prop_file)
+        self.security_protocol = self.security_config.security_protocol
+        self.prop_file += str(self.security_config)
+
     def _worker(self, idx, node):
+        # Create and upload config file
+        self.logger.info("verifiable_producer.properties:")
+        self.logger.info(self.prop_file)
+        node.account.create_file(VerifiableProducer.CONFIG_FILE, self.prop_file)
+        self.security_config.setup_node(node)
+
         cmd = self.start_cmd
         self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
 
@@ -64,6 +77,7 @@ class VerifiableProducer(BackgroundThreadService):
         if self.throughput > 0:
             cmd += " --throughput %s" % str(self.throughput)
 
+        cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
         cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &"
         return cmd
 
@@ -99,7 +113,8 @@ class VerifiableProducer(BackgroundThreadService):
 
     def clean_node(self, node):
         node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
-        node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
+        node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties", allow_fail=False)
+        self.security_config.clean_node(node)
 
     def try_parse_json(self, string):
         """Try to parse a string as json. Return None if not parseable."""

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/tests/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py
index 02503ec..7219c0a 100644
--- a/tests/kafkatest/tests/benchmark_test.py
+++ b/tests/kafkatest/tests/benchmark_test.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 from ducktape.services.service import Service
+from ducktape.tests.test import Test
 from ducktape.mark import parametrize
 from ducktape.mark import matrix
 
-from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
 from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
 
 
@@ -26,16 +28,21 @@ TOPIC_REP_THREE = "topic-replication-factor-three"
 DEFAULT_RECORD_SIZE = 100  # bytes
 
 
-class Benchmark(KafkaTest):
+class Benchmark(Test):
     """A benchmark of Kafka producer/consumer performance. This replicates the test
     run here:
     https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
     """
     def __init__(self, test_context):
-        super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
+        super(Benchmark, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 3
+        self.topics = {
             TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
             TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
-        })
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
 
         self.msgs_large = 10000000
         self.batch_size = 8*1024
@@ -44,25 +51,36 @@ class Benchmark(KafkaTest):
         self.target_data_size = 128*1024*1024
         self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
 
-    @parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
-    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
-    @parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
-    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE)
-    @matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000])
-    def test_producer_throughput(self, acks, topic, num_producers, message_size):
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+        self.kafka.start()
+
+    @parametrize(acks=1, topic=TOPIC_REP_ONE)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE)
+    @parametrize(acks=-1, topic=TOPIC_REP_THREE)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
+    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
+    def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
-        and message size are varied depending on arguments injected into this test.
+        security protocol and message size are varied depending on arguments injected into this test.
 
         Collect and return aggregate throughput statistics after all messages have been acknowledged.
         (This runs ProducerPerformance.java under the hood)
         """
+        self.start_kafka(security_protocol, security_protocol)
         # Always generate the same total amount of data
         nrecords = int(self.target_data_size / message_size)
 
         self.producer = ProducerPerformanceService(
-            self.test_context, num_producers, self.kafka, topic=topic,
+            self.test_context, num_producers, self.kafka, security_protocol=security_protocol, topic=topic,
             num_records=nrecords, record_size=message_size,  throughput=-1,
             settings={
                 'acks': acks,
@@ -71,7 +89,9 @@ class Benchmark(KafkaTest):
         self.producer.run()
         return compute_aggregate_throughput(self.producer)
 
-    def test_long_term_producer_throughput(self):
+    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL'])
+    def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
@@ -80,8 +100,9 @@ class Benchmark(KafkaTest):
 
         (This runs ProducerPerformance.java under the hood)
         """
+        self.start_kafka(security_protocol, security_protocol)
         self.producer = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
+            self.test_context, 1, self.kafka, security_protocol=security_protocol,
             topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
             throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
             intermediate_stats=True
@@ -111,7 +132,10 @@ class Benchmark(KafkaTest):
         self.logger.info("\n".join(summary))
         return data
 
-    def test_end_to_end_latency(self):
+    
+    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL'])
+    def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
@@ -121,16 +145,19 @@ class Benchmark(KafkaTest):
 
         (Under the hood, this simply runs EndToEndLatency.scala)
         """
+        self.start_kafka(security_protocol, interbroker_security_protocol)
         self.logger.info("BENCHMARK: End to end latency")
         self.perf = EndToEndLatencyService(
             self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, num_records=10000
+            topic=TOPIC_REP_THREE, security_protocol=security_protocol, num_records=10000
         )
         self.perf.run()
         return latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
 
-    @matrix(new_consumer=[True, False])
-    def test_producer_and_consumer(self, new_consumer=False):
+    @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL')
+    @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT'])
+    def test_producer_and_consumer(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT'):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
@@ -140,15 +167,17 @@ class Benchmark(KafkaTest):
 
         (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
         """
+        self.start_kafka(security_protocol, interbroker_security_protocol)
         num_records = 10 * 1000 * 1000  # 10e6
 
         self.producer = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+            self.test_context, 1, self.kafka, 
+            topic=TOPIC_REP_THREE, security_protocol=security_protocol,
+            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
             settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
         )
         self.consumer = ConsumerPerformanceService(
-            self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+            self.test_context, 1, self.kafka, security_protocol, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
         Service.run_parallel(self.producer, self.consumer)
 
         data = {
@@ -161,18 +190,22 @@ class Benchmark(KafkaTest):
         self.logger.info("\n".join(summary))
         return data
 
-    @matrix(new_consumer=[True, False], num_consumers=[1])
-    def test_consumer_throughput(self, new_consumer, num_consumers):
+    @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL')
+    @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT'])
+    def test_consumer_throughput(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT', num_consumers=1):
         """
         Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
         (using new consumer iff new_consumer == True), and report throughput.
         """
+        self.start_kafka(security_protocol, interbroker_security_protocol)
         num_records = 10 * 1000 * 1000  # 10e6
 
         # seed kafka w/messages
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+            topic=TOPIC_REP_THREE, security_protocol=security_protocol,
+            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
             settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
         )
         self.producer.run()
@@ -180,7 +213,7 @@ class Benchmark(KafkaTest):
         # consume
         self.consumer = ConsumerPerformanceService(
             self.test_context, num_consumers, self.kafka,
-            topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+            topic=TOPIC_REP_THREE, security_protocol=security_protocol, new_consumer=new_consumer, messages=num_records)
         self.consumer.group = "test-consumer-group"
         self.consumer.run()
         return compute_aggregate_throughput(self.consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index a83769b..d20cc22 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -15,6 +15,8 @@
 
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize
+from ducktape.mark import matrix
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -36,24 +38,18 @@ class ReplicationTest(Test):
 
         self.topic = "test_topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    "min.insync.replicas": 2}
-                                                                })
         self.producer_throughput = 10000
         self.num_producers = 1
         self.num_consumers = 1
 
     def setUp(self):
         self.zk.start()
-        self.kafka.start()
 
     def min_cluster_size(self):
         """Override this since we're adding services outside of the constructor"""
         return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
-    def run_with_failure(self, failure):
+    def run_with_failure(self, failure, interbroker_security_protocol):
         """This is the top-level test template.
 
         The steps are:
@@ -75,8 +71,18 @@ class ReplicationTest(Test):
         indicator that nothing is left to consume.
 
         """
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int)
+        security_protocol='PLAINTEXT'
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, 
+                                  security_protocol=security_protocol,
+                                  interbroker_security_protocol=interbroker_security_protocol,
+                                  topics={self.topic: {
+                                               "partitions": 3,
+                                               "replication-factor": 3,
+                                               "min.insync.replicas": 2}
+                                         })
+        self.kafka.start()
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=security_protocol, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=security_protocol, new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int)
 
         # Produce in a background thread while driving broker failures
         self.producer.start()
@@ -149,14 +155,19 @@ class ReplicationTest(Test):
 
         return success, msg
 
-    def test_clean_shutdown(self):
-        self.run_with_failure(self.clean_shutdown)
+    
+    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
+    def test_clean_shutdown(self, interbroker_security_protocol):
+        self.run_with_failure(self.clean_shutdown, interbroker_security_protocol)
 
-    def test_hard_shutdown(self):
-        self.run_with_failure(self.hard_shutdown)
+    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
+    def test_hard_shutdown(self, interbroker_security_protocol):
+        self.run_with_failure(self.hard_shutdown, interbroker_security_protocol)
 
-    def test_clean_bounce(self):
-        self.run_with_failure(self.clean_bounce)
+    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
+    def test_clean_bounce(self, interbroker_security_protocol):
+        self.run_with_failure(self.clean_bounce, interbroker_security_protocol)
 
-    def test_hard_bounce(self):
-        self.run_with_failure(self.hard_bounce)
+    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
+    def test_hard_bounce(self, interbroker_security_protocol):
+        self.run_with_failure(self.hard_bounce, interbroker_security_protocol)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tests/kafkatest/utils/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/security_config.py b/tests/kafkatest/utils/security_config.py
new file mode 100644
index 0000000..965f209
--- /dev/null
+++ b/tests/kafkatest/utils/security_config.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+
+
+class Keytool(object):
+
+    @staticmethod
+    def generate_keystore_truststore(ssl_dir='.'):
+        """
+        Generate JKS keystore and truststore and return
+        Kafka SSL properties with these stores.
+        """
+        ks_path = os.path.join(ssl_dir, 'test.keystore.jks')
+        ks_password = 'test-ks-passwd'
+        key_password = 'test-key-passwd'
+        ts_path = os.path.join(ssl_dir, 'test.truststore.jks')
+        ts_password = 'test-ts-passwd'
+        if os.path.exists(ks_path):
+            os.remove(ks_path)
+        if os.path.exists(ts_path):
+            os.remove(ts_path)
+        
+        Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password))
+        Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password))
+        Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password))
+        os.remove('test.crt')
+
+        return {
+            'ssl.keystore.location' : ks_path,
+            'ssl.keystore.password' : ks_password,
+            'ssl.key.password' : key_password,
+            'ssl.truststore.location' : ts_path,
+            'ssl.truststore.password' : ts_password
+        }
+
+    @staticmethod
+    def runcmd(cmd):
+        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+        proc.communicate()
+        if proc.returncode != 0:
+            raise subprocess.CalledProcessError(proc.returncode, cmd)
+
+
+class SecurityConfig(object):
+
+    PLAINTEXT = 'PLAINTEXT'
+    SSL = 'SSL'
+    SSL_DIR = "/mnt/ssl"
+    KEYSTORE_PATH = "/mnt/ssl/test.keystore.jks"
+    TRUSTSTORE_PATH = "/mnt/ssl/test.truststore.jks"
+
+    ssl_stores = Keytool.generate_keystore_truststore('.')
+
+    def __init__(self, security_protocol, template_props=""):
+        """
+        Initialize the security properties for the node and copy
+        keystore and truststore to the remote node if the transport protocol 
+        is SSL. If security_protocol is None, the protocol specified in the
+        template properties file is used. If no protocol is specified in the
+        template properties either, PLAINTEXT is used as default.
+        """
+
+        if security_protocol is None:
+            security_protocol = self.get_property('security.protocol', template_props)
+        if security_protocol is None:
+            security_protocol = SecurityConfig.PLAINTEXT
+        elif security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL]:
+            raise Exception("Invalid security.protocol in template properties: " + security_protocol)
+
+        self.properties = {
+            'security.protocol' : security_protocol,
+            'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
+            'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'],
+            'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
+            'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
+            'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password']
+        }
+    
+    def setup_node(self, node):
+        if self.security_protocol == SecurityConfig.SSL:
+            node.account.ssh("mkdir -p %s" % SecurityConfig.SSL_DIR, allow_fail=False)
+            node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH)
+            node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH)
+
+    def clean_node(self, node):
+        if self.security_protocol == SecurityConfig.SSL:
+            node.account.ssh("rm -rf %s" % SecurityConfig.SSL_DIR, allow_fail=False)
+
+    def get_property(self, prop_name, template_props=""):
+        """
+        Get property value from the string representation of
+        a properties file.
+        """
+        value = None
+        for line in template_props.split("\n"):
+            items = line.split("=")
+            if len(items) == 2 and items[0].strip() == prop_name:
+                value = str(items[1].strip())
+        return value
+
+    @property
+    def security_protocol(self):
+        return self.properties['security.protocol']
+
+    def __str__(self):
+        """
+        Return properties as string with line separators.
+        This is used to append security config properties to
+        a properties file.
+        """
+
+        prop_str = ""
+        if self.security_protocol == SecurityConfig.SSL:
+            for key, value in self.properties.items():
+                prop_str += ("\n" + key + "=" + value)
+            prop_str += "\n"
+        return prop_str
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd514b2b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
index b0e19fc..a79f78e 100644
--- a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Utils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -128,6 +129,13 @@ public class VerifiableProducer {
                 .metavar("ACKS")
                 .help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
 
+        parser.addArgument("--producer.config")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .metavar("CONFIG_FILE")
+                .help("Producer config properties file.");
+
         return parser;
     }
   
@@ -143,6 +151,7 @@ public class VerifiableProducer {
             int maxMessages = res.getInt("maxMessages");
             String topic = res.getString("topic");
             int throughput = res.getInt("throughput");
+            String configFile = res.getString("producer.config");
 
             Properties producerProps = new Properties();
             producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
@@ -153,6 +162,13 @@ public class VerifiableProducer {
             producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
             // No producer retries
             producerProps.put("retries", "0");
+            if (configFile != null) {
+                try {
+                    producerProps.putAll(Utils.loadProps(configFile));
+                } catch (IOException e) {
+                    throw new ArgumentParserException(e.getMessage(), parser);
+                }
+            }
 
             producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
         } catch (ArgumentParserException e) {


Mime
View raw message