kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2527; System Test for Quotas in Ducktape
Date Tue, 13 Oct 2015 20:54:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 36d446932 -> 123d27e4d


KAFKA-2527; System Test for Quotas in Ducktape

granders Can you take a look at this quota system test?

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Geoff Anderson, Ewen Cheslack-Postava

Closes #275 from lindong28/KAFKA-2527


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/123d27e4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/123d27e4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/123d27e4

Branch: refs/heads/trunk
Commit: 123d27e4d005a384611671133fbecde7e390d24f
Parents: 36d4469
Author: Dong Lin <lindong28@gmail.com>
Authored: Tue Oct 13 13:54:40 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Oct 13 13:54:40 2015 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py    |  35 ++--
 tests/kafkatest/services/kafka.py               |  23 ++-
 .../kafkatest/services/performance/jmx_mixin.py |  81 +++++++++
 .../performance/producer_performance.py         |  24 ++-
 .../templates/console_consumer.properties       |   7 +
 .../services/templates/kafka.properties         |  16 ++
 tests/kafkatest/tests/quota_test.py             | 180 +++++++++++++++++++
 7 files changed, 340 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/123d27e4/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 2f1e70e..9b216fe 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -15,11 +15,13 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 from ducktape.utils.util import wait_until
+from kafkatest.services.performance.jmx_mixin import JmxMixin
+from kafkatest.services.performance import PerformanceService
 from kafkatest.utils.security_config import SecurityConfig
 
 import os
 import subprocess
-
+import itertools
 
 def is_int(msg):
     """Default method used to check whether text pulled from console consumer is a message.
@@ -72,7 +74,7 @@ Option                                  Description
 """
 
 
-class ConsoleConsumer(BackgroundThreadService):
+class ConsoleConsumer(JmxMixin, PerformanceService):
     # Root directory for persistent output
     PERSISTENT_ROOT = "/mnt/console_consumer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
@@ -94,7 +96,8 @@ class ConsoleConsumer(BackgroundThreadService):
             "collect_default": True}
         }
 
-    def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None,
message_validator=None, from_beginning=True, consumer_timeout_ms=None):
+    def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None,
message_validator=None,
+                 from_beginning=True, consumer_timeout_ms=None, client_id="console-consumer",
jmx_object_names=None, jmx_attributes=[]):
         """
         Args:
             context:                    standard context
@@ -110,7 +113,8 @@ class ConsoleConsumer(BackgroundThreadService):
                                         waiting for the consumer to stop is a pretty good
way to consume all messages
                                         in a topic.
         """
-        super(ConsoleConsumer, self).__init__(context, num_nodes)
+        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
+        PerformanceService.__init__(self, context, num_nodes)
         self.kafka = kafka
         self.new_consumer = new_consumer
         self.args = {
@@ -122,9 +126,10 @@ class ConsoleConsumer(BackgroundThreadService):
         self.from_beginning = from_beginning
         self.message_validator = message_validator
         self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
+        self.client_id = client_id
 
         # Process client configuration
-        self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
+        self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms,
client_id=self.client_id)
 
         # Add security properties to the config. If security protocol is not specified,
         # use the default in the template properties.
@@ -143,10 +148,11 @@ class ConsoleConsumer(BackgroundThreadService):
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
         args['config_file'] = ConsoleConsumer.CONFIG_FILE
+        args['jmx_port'] = self.jmx_port
 
         cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
-        cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \
+        cmd += " JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s"
\
             " --consumer.config %(config_file)s" % args
 
         if self.new_consumer:
@@ -173,6 +179,7 @@ class ConsoleConsumer(BackgroundThreadService):
     def _worker(self, idx, node):
         node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
 
+        # Create and upload config file
         self.logger.info("console_consumer.properties:")
         self.logger.info(self.prop_file)
         node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file)
@@ -185,18 +192,24 @@ class ConsoleConsumer(BackgroundThreadService):
         # Run and capture output
         cmd = self.start_cmd
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
-        for line in node.account.ssh_capture(cmd, allow_fail=False):
+
+        consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
+        first_line = consumer_output.next()
+        self.start_jmx_tool(idx, node)
+        for line in itertools.chain([first_line], consumer_output):
             msg = line.strip()
             if self.message_validator is not None:
                 msg = self.message_validator(msg)
             if msg is not None:
                 self.messages_consumed[idx].append(msg)
 
+        self.read_jmx_output(idx, node)
+
     def start_node(self, node):
-        super(ConsoleConsumer, self).start_node(node)
+        PerformanceService.start_node(self, node)
 
     def stop_node(self, node):
-        node.account.kill_process("java", allow_fail=True)
+        node.account.kill_process("console_consumer", allow_fail=True)
         wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2,
                    err_msg="Timed out waiting for consumer to stop.")
 
@@ -204,7 +217,7 @@ class ConsoleConsumer(BackgroundThreadService):
         if self.alive(node):
             self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..."
%
                              (self.__class__.__name__, node.account))
-        node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
+        JmxMixin.clean_node(self, node)
+        PerformanceService.clean_node(self, node)
         node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
         self.security_config.clean_node(node)
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/123d27e4/tests/kafkatest/services/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py
index 2c200f3..5c4b22f 100644
--- a/tests/kafkatest/services/kafka.py
+++ b/tests/kafkatest/services/kafka.py
@@ -15,15 +15,15 @@
 
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
+from kafkatest.services.performance.jmx_mixin import JmxMixin
 from kafkatest.utils.security_config import SecurityConfig
-
 import json
 import re
 import signal
 import time
 
 
-class KafkaService(Service):
+class KafkaService(JmxMixin, Service):
 
     logs = {
         "kafka_log": {
@@ -34,13 +34,15 @@ class KafkaService(Service):
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
interbroker_security_protocol=SecurityConfig.PLAINTEXT, topics=None):
+    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
+                 topics=None, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
         """
         :type context
         :type zk: ZookeeperService
         :type topics: dict
         """
-        super(KafkaService, self).__init__(context, num_nodes)
+        Service.__init__(self, context, num_nodes)
+        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
         self.zk = zk
         if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL:
             self.security_config = SecurityConfig(SecurityConfig.SSL)
@@ -50,9 +52,10 @@ class KafkaService(Service):
         self.interbroker_security_protocol = interbroker_security_protocol
         self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093
         self.topics = topics
+        self.quota_config = quota_config
 
     def start(self):
-        super(KafkaService, self).start()
+        Service.start(self)
 
         # Create topics if necessary
         if self.topics is not None:
@@ -65,18 +68,19 @@ class KafkaService(Service):
 
     def start_node(self, node):
         props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node),
-            port = self.port, security_protocol = self.security_protocol, 
+            port = self.port, security_protocol = self.security_protocol, quota_config=self.quota_config,
             interbroker_security_protocol=self.interbroker_security_protocol)
         self.logger.info("kafka.properties:")
         self.logger.info(props_file)
         node.account.create_file("/mnt/kafka.properties", props_file)
         self.security_config.setup_node(node)
 
-        cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log
2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"
+        cmd = "JMX_PORT=%d /opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>>
/mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" % self.jmx_port
         self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account),
cmd))
         with node.account.monitor_log("/mnt/kafka.log") as monitor:
             node.account.ssh(cmd)
             monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server
didn't finish startup")
+        self.start_jmx_tool(self.idx(node), node)
         if len(self.pids(node)) == 0:
             raise Exception("No process ids recorded on node %s" % str(node))
 
@@ -106,6 +110,7 @@ class KafkaService(Service):
         node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
 
     def clean_node(self, node):
+        JmxMixin.clean_node(self, node)
         node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid",
allow_fail=False)
         self.security_config.clean_node(node)
@@ -242,3 +247,7 @@ class KafkaService(Service):
         """Get the broker list to connect to Kafka using the specified security protocol
         """
         return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes])
+
+    def read_jmx_output_all_nodes(self):
+        for node in self.nodes:
+            self.read_jmx_output(self.idx(node), node)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/123d27e4/tests/kafkatest/services/performance/jmx_mixin.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/jmx_mixin.py b/tests/kafkatest/services/performance/jmx_mixin.py
new file mode 100644
index 0000000..7e19839
--- /dev/null
+++ b/tests/kafkatest/services/performance/jmx_mixin.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class JmxMixin(object):
+
+    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
+        self.jmx_object_names = jmx_object_names
+        self.jmx_attributes = jmx_attributes
+        self.jmx_port = 9192
+
+        self.started = [False] * num_nodes
+        self.jmx_stats = [{} for x in range(num_nodes)]
+        self.maximum_jmx_value = {}  # map from object_attribute_name to maximum value observed
over time
+        self.average_jmx_value = {}  # map from object_attribute_name to average value observed
over time
+
+    def clean_node(self, node):
+        node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
+
+    def start_jmx_tool(self, idx, node):
+        if self.started[idx-1] == True or self.jmx_object_names == None:
+            return
+        self.started[idx-1] = True
+
+        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool " \
+              "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi"
% self.jmx_port
+        for jmx_object_name in self.jmx_object_names:
+            cmd += " --object-name %s" % jmx_object_name
+        for jmx_attribute in self.jmx_attributes:
+            cmd += " --attributes %s" % jmx_attribute
+        cmd += " | tee -a /mnt/jmx_tool.log"
+
+        self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
+        jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
+        jmx_output.next()
+
+    def read_jmx_output(self, idx, node):
+        if self.started[idx-1] == False:
+            return
+        self.maximum_jmx_value = {}
+        self.average_jmx_value = {}
+        object_attribute_names = []
+
+        cmd = "cat /mnt/jmx_tool.log"
+        self.logger.debug("Read jmx output %d command: %s", idx, cmd)
+        for line in node.account.ssh_capture(cmd, allow_fail=False):
+            if "time" in line:
+                object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
+                continue
+            stats = [float(field) for field in line.split(',')]
+            time_sec = int(stats[0]/1000)
+            self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
+
+        # do not calculate average and maximum of jmx stats until we have read output from
all nodes
+        if any(len(time_to_stats)==0 for time_to_stats in self.jmx_stats):
+            return
+
+        start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
+        end_time_sec = max([max(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
+
+        for name in object_attribute_names:
+            aggregates_per_time = []
+            for time_sec in xrange(start_time_sec, end_time_sec+1):
+                # assume that value is 0 if it is not read by jmx tool at the given time.
This is appropriate for metrics such as bandwidth
+                values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats
in self.jmx_stats]
+                # assume that value is aggregated across nodes by sum. This is appropriate
for metrics such as bandwidth
+                aggregates_per_time.append(sum(values_per_node))
+            self.average_jmx_value[name] = sum(aggregates_per_time)/len(aggregates_per_time)
+            self.maximum_jmx_value[name] = max(aggregates_per_time)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/123d27e4/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 7a026fc..f842026 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -13,11 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from kafkatest.services.performance.jmx_mixin import JmxMixin
 from kafkatest.services.performance import PerformanceService
+import itertools
 from kafkatest.utils.security_config import SecurityConfig
 
-
-class ProducerPerformanceService(PerformanceService):
+class ProducerPerformanceService(JmxMixin, PerformanceService):
 
     logs = {
         "producer_performance_log": {
@@ -25,8 +26,10 @@ class ProducerPerformanceService(PerformanceService):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records,
record_size, throughput, settings={}, intermediate_stats=False):
-        super(ProducerPerformanceService, self).__init__(context, num_nodes)
+    def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records,
record_size, throughput, settings={},
+                 intermediate_stats=False, client_id="producer-performance", jmx_object_names=None,
jmx_attributes=[]):
+        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
+        PerformanceService.__init__(self, context, num_nodes)
         self.kafka = kafka
         self.security_config = SecurityConfig(security_protocol)
         self.security_protocol = security_protocol
@@ -38,12 +41,13 @@ class ProducerPerformanceService(PerformanceService):
         }
         self.settings = settings
         self.intermediate_stats = intermediate_stats
+        self.client_id = client_id
 
     def _worker(self, idx, node):
         args = self.args.copy()
-        args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
-        cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
"\
-              "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"
% args
+        args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 'jmx_port': self.jmx_port,
'client_id': self.client_id})
+        cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
" \
+              "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s
client.id=%(client_id)s" % args
 
         self.security_config.setup_node(node)
         if self.security_protocol == SecurityConfig.SSL:
@@ -68,7 +72,10 @@ class ProducerPerformanceService(PerformanceService):
                 'latency_999th_ms': float(parts[7].split()[0]),
             }
         last = None
-        for line in node.account.ssh_capture(cmd):
+        producer_output = node.account.ssh_capture(cmd)
+        first_line = producer_output.next()
+        self.start_jmx_tool(idx, node)
+        for line in itertools.chain([first_line], producer_output):
             if self.intermediate_stats:
                 try:
                     self.stats[idx-1].append(parse_stats(line))
@@ -81,3 +88,4 @@ class ProducerPerformanceService(PerformanceService):
             self.results[idx-1] = parse_stats(last)
         except:
             raise Exception("Unable to parse aggregate performance statistics on node %d:
%s" % (idx, last))
+        self.read_jmx_output(idx, node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/123d27e4/tests/kafkatest/services/templates/console_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties
index 7143179..bab4932 100644
--- a/tests/kafkatest/services/templates/console_consumer.properties
+++ b/tests/kafkatest/services/templates/console_consumer.properties
@@ -17,3 +17,10 @@
 {% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
 consumer.timeout.ms={{ consumer_timeout_ms }}
 {% endif %}
+
+group.id={{ group_id|default('test-consumer-group') }}
+
+{% if client_id is defined and client_id is not none %}
+client.id={{ client_id }}
+{% endif %}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/123d27e4/tests/kafkatest/services/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties
index 036a8db..a7f6604 100644
--- a/tests/kafkatest/services/templates/kafka.properties
+++ b/tests/kafkatest/services/templates/kafka.properties
@@ -47,6 +47,22 @@ log.cleaner.enable=false
 zookeeper.connect={{ zk.connect_setting() }}
 zookeeper.connection.timeout.ms=2000
 
+{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default
is not none %}
+quota.producer.default={{ quota_config.quota_producer_default }}
+{% endif %}
+
+{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default
is not none %}
+quota.consumer.default={{ quota_config.quota_consumer_default }}
+{% endif %}
+
+{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides
is not none %}
+quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides
}}
+{% endif %}
+
+{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides
is not none %}
+quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides
}}
+{% endif %}
+
 security.inter.broker.protocol={{ interbroker_security_protocol }}
 ssl.keystore.location=/mnt/ssl/test.keystore.jks
 ssl.keystore.password=test-ks-passwd

http://git-wip-us.apache.org/repos/asf/kafka/blob/123d27e4/tests/kafkatest/tests/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py
new file mode 100644
index 0000000..4ae2e08
--- /dev/null
+++ b/tests/kafkatest/tests/quota_test.py
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.performance import ProducerPerformanceService
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+
+import random
+import signal
+import time
+
+class QuotaTest(Test):
+    """
+    These tests verify that quota provides expected functionality -- they run
+    producer, broker, and consumer with different clientId and quota configuration and
+    check that the observed throughput is close to the value we expect.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(QuotaTest, self).__init__(test_context=test_context)
+
+        self.topic = 'test_topic'
+        self.logger.info('use topic ' + self.topic)
+
+        # quota related parameters
+        self.quota_config = {'quota_producer_default': 2500000,
+                             'quota_consumer_default': 2000000,
+                             'quota_producer_bytes_per_second_overrides': 'overridden_id=3750000',
+                             'quota_consumer_bytes_per_second_overrides': 'overridden_id=3000000'}
+        self.maximum_client_deviation_percentage = 100.0
+        self.maximum_broker_deviation_percentage = 5.0
+        self.num_records = 100000
+        self.record_size = 3000
+        self.security_protocol = 'PLAINTEXT'
+        self.interbroker_security_protocol = 'PLAINTEXT'
+
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  security_protocol=self.security_protocol,
+                                  interbroker_security_protocol=self.interbroker_security_protocol,
+                                  topics={self.topic: {'partitions': 6, 'replication-factor':
1, 'min.insync.replicas': 1}},
+                                  quota_config=self.quota_config,
+                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
+                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
+                                  jmx_attributes=['OneMinuteRate'])
+        self.num_producers = 1
+        self.num_consumers = 2
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    def run_clients(self, producer_id, producer_num, consumer_id, consumer_num):
+        # Produce all messages
+        producer = ProducerPerformanceService(
+            self.test_context, producer_num, self.kafka, security_protocol=self.security_protocol,
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size,
throughput=-1, client_id=producer_id,
+            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
jmx_attributes=['outgoing-byte-rate'])
+
+        producer.run()
+
+        # Consume all messages
+        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
+            security_protocol=self.security_protocol, new_consumer=False,
+            consumer_timeout_ms=60000, client_id=consumer_id,
+            jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s'
% consumer_id],
+            jmx_attributes=['OneMinuteRate'])
+        consumer.run()
+
+        for idx, messages in consumer.messages_consumed.iteritems():
+            assert len(messages)>0, "consumer %d didn't consume any message before timeout"
% idx
+
+        success, msg = self.validate(self.kafka, producer, consumer)
+        assert success, msg
+
+    def validate(self, broker, producer, consumer):
+        """
+        For each client_id we validate that:
+        1) number of consumed messages equals number of produced messages
+        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        """
+        success = True
+        msg = ''
+
+        self.kafka.read_jmx_output_all_nodes()
+
+        # validate that number of consumed messages equals number of produced messages
+        produced_num = sum([value['records'] for value in producer.results])
+        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
+        self.logger.info('producer produced %d messages' % produced_num)
+        self.logger.info('consumer consumed %d messages' % consumed_num)
+        if produced_num != consumed_num:
+            success = False
+            msg += "number of produced messages %d doesn't equal number of consumed messages
%d" % (produced_num, consumed_num)
+
+        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate'
% producer.client_id
+        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+        producer_quota_bps = self.get_producer_quota(producer.client_id)
+        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f
bps' % (producer_maximum_bps, producer_quota_bps))
+        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps
by more than %.1f%%' % \
+                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
+        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
+        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f
bps' %
+                         (broker_maximum_byte_in_bps, producer_quota_bps))
+        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps
by more than %.1f%%' % \
+                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        consumer_attribute_name = 'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate'
% consumer.client_id
+        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
+        consumer_quota_bps = self.get_consumer_quota(consumer.client_id)
+        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f
bps' % (consumer_maximum_bps, consumer_quota_bps))
+        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps
by more than %.1f%%' % \
+                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
+        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
+        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f
bps' %
+                         (broker_maximum_byte_out_bps, consumer_quota_bps))
+        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps
by more than %.1f%%' % \
+                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        return success, msg
+
+    def get_producer_quota(self, client_id):
+        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')}
+        if client_id in overridden_quotas:
+            return float(overridden_quotas[client_id])
+        return self.quota_config['quota_producer_default']
+
+    def get_consumer_quota(self, client_id):
+        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')}
+        if client_id in overridden_quotas:
+            return float(overridden_quotas[client_id])
+        return self.quota_config['quota_consumer_default']
+
+    @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
+    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id',
consumer_num=1)
+    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id',
consumer_num=2)
+    def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id',
consumer_num=1):
+        self.run_clients(producer_id, producer_num, consumer_id, consumer_num)


Mime
View raw message