Repository: kafka
Updated Branches:
refs/heads/0.10.1 e4d8059d7 -> 4104f014e
KAFKA-4055; System tests for secure quotas
Fix existing client-id quota test which currently don't configure quota overrides correctly.
Add new tests for user and (user, client-id) quota overrides and default quotas.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes #1860 from rajinisivaram/KAFKA-4055
(cherry picked from commit c0a62b70a8eadc550c937bb18e0203ab691618f5)
Signed-off-by: Jun Rao <junrao@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4104f014
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4104f014
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4104f014
Branch: refs/heads/0.10.1
Commit: 4104f014e92bbf6878e575a93a850ff3a569ccf0
Parents: e4d8059
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Sun Sep 25 17:01:45 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sun Sep 25 17:03:51 2016 -0700
----------------------------------------------------------------------
.../kafkatest/services/kafka/config_property.py | 2 -
tests/kafkatest/services/kafka/kafka.py | 3 +-
.../services/kafka/templates/kafka.properties | 16 ---
tests/kafkatest/tests/client/quota_test.py | 118 +++++++++++++------
4 files changed, 85 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4104f014/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index 217e970..e329151 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -144,8 +144,6 @@ From KafkaConfig.scala
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
- val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
- val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
val NumQuotaSamplesProp = "quota.window.num"
val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
http://git-wip-us.apache.org/repos/asf/kafka/blob/4104f014/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 734eb5c..4ce86b8 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -67,7 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
- authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None,
jmx_object_names=None,
+ authorizer_class_name=None, topics=None, version=TRUNK, jmx_object_names=None,
jmx_attributes=[], zk_connect_timeout=5000, zk_session_timeout=6000):
"""
:type context
@@ -78,7 +78,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
self.zk = zk
- self.quota_config = quota_config
self.security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
http://git-wip-us.apache.org/repos/asf/kafka/blob/4104f014/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index c02c64f..06ec603 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -33,22 +33,6 @@ log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
-{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default
is not none %}
-quota.producer.default={{ quota_config.quota_producer_default }}
-{% endif %}
-
-{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default
is not none %}
-quota.consumer.default={{ quota_config.quota_consumer_default }}
-{% endif %}
-
-{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides
is not none %}
-quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides
}}
-{% endif %}
-
-{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides
is not none %}
-quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides
}}
-{% endif %}
-
security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}
ssl.keystore.location=/mnt/security/test.keystore.jks
http://git-wip-us.apache.org/repos/asf/kafka/blob/4104f014/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 7c2ec59..2abd089 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -14,13 +14,77 @@
# limitations under the License.
from ducktape.tests.test import Test
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer
+class QuotaConfig(object):
+ CLIENT_ID = 'client-id'
+ USER = 'user'
+ USER_CLIENT = '(user, client-id)'
+
+ LARGE_QUOTA = 1000 * 1000 * 1000
+ USER_PRINCIPAL = 'CN=systemtest'
+
+ def __init__(self, quota_type, override_quota, kafka):
+ if quota_type == QuotaConfig.CLIENT_ID:
+ if override_quota:
+ self.client_id = 'overridden_id'
+ self.producer_quota = 3750000
+ self.consumer_quota = 3000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients',
self.client_id])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', None])
+ else:
+ self.client_id = 'default_id'
+ self.producer_quota = 2500000
+ self.consumer_quota = 2000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients',
None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', 'overridden_id'])
+ elif quota_type == QuotaConfig.USER:
+ if override_quota:
+ self.client_id = 'some_id'
+ self.producer_quota = 3750000
+ self.consumer_quota = 3000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users',
QuotaConfig.USER_PRINCIPAL])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['users', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', self.client_id])
+ else:
+ self.client_id = 'some_id'
+ self.producer_quota = 2500000
+ self.consumer_quota = 2000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users',
None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', None])
+ elif quota_type == QuotaConfig.USER_CLIENT:
+ if override_quota:
+ self.client_id = 'overridden_id'
+ self.producer_quota = 3750000
+ self.consumer_quota = 3000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users',
QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['users', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', self.client_id])
+ else:
+ self.client_id = 'default_id'
+ self.producer_quota = 2500000
+ self.consumer_quota = 2000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users',
None, 'clients', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['users', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', None])
+
+ def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
+ node = kafka.nodes[0]
+ cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d"
% \
+ (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate,
consumer_byte_rate)
+ cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
+ if len(entity_args) > 2:
+ cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
+ node.account.ssh(cmd)
+
+ def entity_name_opt(self, name):
+ return " --entity-default" if name is None else " --entity-name " + name
class QuotaTest(Test):
"""
@@ -36,22 +100,16 @@ class QuotaTest(Test):
self.topic = 'test_topic'
self.logger.info('use topic ' + self.topic)
- # quota related parameters
- self.quota_config = {'quota_producer_default': 2500000,
- 'quota_consumer_default': 2000000,
- 'quota_producer_bytes_per_second_overrides': 'overridden_id=3750000',
- 'quota_consumer_bytes_per_second_overrides': 'overridden_id=3000000'}
self.maximum_client_deviation_percentage = 100.0
self.maximum_broker_deviation_percentage = 5.0
- self.num_records = 100000
+ self.num_records = 50000
self.record_size = 3000
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
- security_protocol='PLAINTEXT',
- interbroker_security_protocol='PLAINTEXT',
+ security_protocol='SSL', authorizer_class_name='',
+ interbroker_security_protocol='SSL',
topics={self.topic: {'partitions': 6, 'replication-factor':
1, 'configs': {'min.insync.replicas': 1}}},
- quota_config=self.quota_config,
jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
jmx_attributes=['OneMinuteRate'])
@@ -66,24 +124,27 @@ class QuotaTest(Test):
"""Override this since we're adding services outside of the constructor"""
return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
- @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
- @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id',
consumer_num=1)
- @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id',
consumer_num=2)
- def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id',
consumer_num=1):
+ @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT],
override_quota=[True, False])
+ @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
+ def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+ self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
+ producer_client_id = self.quota_config.client_id
+ consumer_client_id = self.quota_config.client_id
+
# Produce all messages
producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka,
- topic=self.topic, num_records=self.num_records, record_size=self.record_size,
throughput=-1, client_id=producer_id,
- jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
jmx_attributes=['outgoing-byte-rate'])
+ topic=self.topic, num_records=self.num_records, record_size=self.record_size,
throughput=-1, client_id=producer_client_id,
+ jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id],
jmx_attributes=['outgoing-byte-rate'])
producer.run()
# Consume all messages
consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
- new_consumer=False,
- consumer_timeout_ms=60000, client_id=consumer_id,
- jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s'
% consumer_id],
- jmx_attributes=['OneMinuteRate'])
+ new_consumer=True,
+ consumer_timeout_ms=60000, client_id=consumer_client_id,
+ jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s'
% consumer_client_id],
+ jmx_attributes=['bytes-consumed-rate'])
consumer.run()
for idx, messages in consumer.messages_consumed.iteritems():
@@ -118,7 +179,7 @@ class QuotaTest(Test):
# validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate'
% producer.client_id
producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
- producer_quota_bps = self.get_producer_quota(producer.client_id)
+ producer_quota_bps = self.quota_config.producer_quota
self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f
bps' % (producer_maximum_bps, producer_quota_bps))
if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
success = False
@@ -136,9 +197,9 @@ class QuotaTest(Test):
(broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
# validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
- consumer_attribute_name = 'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate'
% consumer.client_id
+ consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate'
% consumer.client_id
consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
- consumer_quota_bps = self.get_consumer_quota(consumer.client_id)
+ consumer_quota_bps = self.quota_config.consumer_quota
self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f
bps' % (consumer_maximum_bps, consumer_quota_bps))
if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
success = False
@@ -157,14 +218,3 @@ class QuotaTest(Test):
return success, msg
- def get_producer_quota(self, client_id):
- overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')}
- if client_id in overridden_quotas:
- return float(overridden_quotas[client_id])
- return self.quota_config['quota_producer_default']
-
- def get_consumer_quota(self, client_id):
- overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')}
- if client_id in overridden_quotas:
- return float(overridden_quotas[client_id])
- return self.quota_config['quota_consumer_default']
|