kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4055; System tests for secure quotas
Date Mon, 26 Sep 2016 00:01:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1d055f755 -> c0a62b70a


KAFKA-4055; System tests for secure quotas

Fix existing client-id quota test which currently don't configure quota overrides correctly.
Add new tests for user and (user, client-id) quota overrides and default quotas.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1860 from rajinisivaram/KAFKA-4055


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c0a62b70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c0a62b70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c0a62b70

Branch: refs/heads/trunk
Commit: c0a62b70a8eadc550c937bb18e0203ab691618f5
Parents: 1d055f7
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Sun Sep 25 17:01:45 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sun Sep 25 17:01:45 2016 -0700

----------------------------------------------------------------------
 .../kafkatest/services/kafka/config_property.py |   2 -
 tests/kafkatest/services/kafka/kafka.py         |   3 +-
 .../services/kafka/templates/kafka.properties   |  16 ---
 tests/kafkatest/tests/client/quota_test.py      | 118 +++++++++++++------
 4 files changed, 85 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index 217e970..e329151 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -144,8 +144,6 @@ From KafkaConfig.scala
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
   val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
-  val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
-  val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
   val NumQuotaSamplesProp = "quota.window.num"
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 734eb5c..4ce86b8 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -67,7 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
     def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
-                 authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None,
jmx_object_names=None,
+                 authorizer_class_name=None, topics=None, version=TRUNK, jmx_object_names=None,
                  jmx_attributes=[], zk_connect_timeout=5000, zk_session_timeout=6000):
         """
         :type context
@@ -78,7 +78,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
 
         self.zk = zk
-        self.quota_config = quota_config
 
         self.security_protocol = security_protocol
         self.interbroker_security_protocol = interbroker_security_protocol

http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index c02c64f..06ec603 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -33,22 +33,6 @@ log.segment.bytes=1073741824
 log.retention.check.interval.ms=300000
 log.cleaner.enable=false
 
-{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default
is not none %}
-quota.producer.default={{ quota_config.quota_producer_default }}
-{% endif %}
-
-{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default
is not none %}
-quota.consumer.default={{ quota_config.quota_consumer_default }}
-{% endif %}
-
-{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides
is not none %}
-quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides
}}
-{% endif %}
-
-{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides
is not none %}
-quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides
}}
-{% endif %}
-
 security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks

http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 7c2ec59..2abd089 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -14,13 +14,77 @@
 # limitations under the License.
 
 from ducktape.tests.test import Test
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.performance import ProducerPerformanceService
 from kafkatest.services.console_consumer import ConsoleConsumer
 
+class QuotaConfig(object):
+    CLIENT_ID = 'client-id'
+    USER = 'user'
+    USER_CLIENT = '(user, client-id)'
+
+    LARGE_QUOTA = 1000 * 1000 * 1000
+    USER_PRINCIPAL = 'CN=systemtest'
+
+    def __init__(self, quota_type, override_quota, kafka):
+        if quota_type == QuotaConfig.CLIENT_ID:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients',
self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', None])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients',
None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', 'overridden_id'])
+        elif quota_type == QuotaConfig.USER:
+            if override_quota:
+                self.client_id = 'some_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users',
QuotaConfig.USER_PRINCIPAL])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', self.client_id])
+            else:
+                self.client_id = 'some_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users',
None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', None])
+        elif quota_type == QuotaConfig.USER_CLIENT:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users',
QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', self.client_id])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users',
None, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA,
['clients', None])
+
+    def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
+        node = kafka.nodes[0]
+        cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d"
% \
+              (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate,
consumer_byte_rate)
+        cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
+        if len(entity_args) > 2:
+            cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
+        node.account.ssh(cmd)
+
+    def entity_name_opt(self, name):
+        return " --entity-default" if name is None else " --entity-name " + name
 
 class QuotaTest(Test):
     """
@@ -36,22 +100,16 @@ class QuotaTest(Test):
         self.topic = 'test_topic'
         self.logger.info('use topic ' + self.topic)
 
-        # quota related parameters
-        self.quota_config = {'quota_producer_default': 2500000,
-                             'quota_consumer_default': 2000000,
-                             'quota_producer_bytes_per_second_overrides': 'overridden_id=3750000',
-                             'quota_consumer_bytes_per_second_overrides': 'overridden_id=3000000'}
         self.maximum_client_deviation_percentage = 100.0
         self.maximum_broker_deviation_percentage = 5.0
-        self.num_records = 100000
+        self.num_records = 50000
         self.record_size = 3000
 
         self.zk = ZookeeperService(test_context, num_nodes=1)
         self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol='PLAINTEXT',
-                                  interbroker_security_protocol='PLAINTEXT',
+                                  security_protocol='SSL', authorizer_class_name='',
+                                  interbroker_security_protocol='SSL',
                                   topics={self.topic: {'partitions': 6, 'replication-factor':
1, 'configs': {'min.insync.replicas': 1}}},
-                                  quota_config=self.quota_config,
                                   jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
                                                     'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
                                   jmx_attributes=['OneMinuteRate'])
@@ -66,24 +124,27 @@ class QuotaTest(Test):
         """Override this since we're adding services outside of the constructor"""
         return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
-    @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id',
consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id',
consumer_num=2)
-    def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id',
consumer_num=1):
+    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT],
override_quota=[True, False])
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
+    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+        self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
+        producer_client_id = self.quota_config.client_id
+        consumer_client_id = self.quota_config.client_id
+
         # Produce all messages
         producer = ProducerPerformanceService(
             self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, record_size=self.record_size,
throughput=-1, client_id=producer_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
jmx_attributes=['outgoing-byte-rate'])
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size,
throughput=-1, client_id=producer_client_id,
+            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id],
jmx_attributes=['outgoing-byte-rate'])
 
         producer.run()
 
         # Consume all messages
         consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
-            new_consumer=False,
-            consumer_timeout_ms=60000, client_id=consumer_id,
-            jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s'
% consumer_id],
-            jmx_attributes=['OneMinuteRate'])
+            new_consumer=True,
+            consumer_timeout_ms=60000, client_id=consumer_client_id,
+            jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s'
% consumer_client_id],
+            jmx_attributes=['bytes-consumed-rate'])
         consumer.run()
 
         for idx, messages in consumer.messages_consumed.iteritems():
@@ -118,7 +179,7 @@ class QuotaTest(Test):
         # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
         producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate'
% producer.client_id
         producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
-        producer_quota_bps = self.get_producer_quota(producer.client_id)
+        producer_quota_bps = self.quota_config.producer_quota
         self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f
bps' % (producer_maximum_bps, producer_quota_bps))
         if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
             success = False
@@ -136,9 +197,9 @@ class QuotaTest(Test):
                    (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
 
         # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        consumer_attribute_name = 'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate'
% consumer.client_id
+        consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate'
% consumer.client_id
         consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
-        consumer_quota_bps = self.get_consumer_quota(consumer.client_id)
+        consumer_quota_bps = self.quota_config.consumer_quota
         self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f
bps' % (consumer_maximum_bps, consumer_quota_bps))
         if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
             success = False
@@ -157,14 +218,3 @@ class QuotaTest(Test):
 
         return success, msg
 
-    def get_producer_quota(self, client_id):
-        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')}
-        if client_id in overridden_quotas:
-            return float(overridden_quotas[client_id])
-        return self.quota_config['quota_producer_default']
-
-    def get_consumer_quota(self, client_id):
-        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')}
-        if client_id in overridden_quotas:
-            return float(overridden_quotas[client_id])
-        return self.quota_config['quota_consumer_default']


Mime
View raw message