kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6944; Add system tests testing the new throttling behavior using older clients/brokers
Date Wed, 27 Jun 2018 23:49:22 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

The following commit(s) were added to refs/heads/trunk by this push:
     new a3db23c  KAFKA-6944; Add system tests testing the new throttling behavior using older
a3db23c is described below

commit a3db23cb76436786ebe0e5f388b4ca97dec51686
Author: Jon Lee <jonlee@linkedin.com>
AuthorDate: Wed Jun 27 16:49:12 2018 -0700

    KAFKA-6944; Add system tests testing the new throttling behavior using older clients/brokers
    Added two additional test cases to quota_test.py, which run between brokers and clients
with different throttling behaviors. More specifically,
    1. clients with new throttling behavior (i.e., post-KIP-219) and brokers with old throttling
behavior (i.e., pre-KIP-219)
    2. clients with old throttling behavior and brokers with new throttling behavior
    Author: Jon Lee <jonlee@linkedin.com>
    Author: Dong Lin <lindong28@gmail.com>
    Reviewers: Dong Lin <lindong28@gmail.com>
    Closes #5294 from jonlee2/kafka-6944
 tests/kafkatest/tests/client/quota_test.py | 23 +++++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)

diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 47a6a96..c084e08 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -21,6 +21,7 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.performance import ProducerPerformanceService
 from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.version import DEV_BRANCH, V_1_1_0
 class QuotaConfig(object):
     CLIENT_ID = 'client-id'
@@ -119,7 +120,6 @@ class QuotaTest(Test):
     def setUp(self):
-        self.kafka.start()
     def min_cluster_size(self):
         """Override this since we're adding services outside of the constructor"""
@@ -128,15 +128,30 @@ class QuotaTest(Test):
     @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT],
override_quota=[True, False])
     @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
-    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_broker_throttling_behavior=True)
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_client_throttling_behavior=True)
+    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1,
+                   old_broker_throttling_behavior=False, old_client_throttling_behavior=False):
+        # Old (pre-2.0) throttling behavior for broker throttles before sending a response
to the client.
+        if old_broker_throttling_behavior:
+            self.kafka.set_version(V_1_1_0)
+        self.kafka.start()
         self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
         producer_client_id = self.quota_config.client_id
         consumer_client_id = self.quota_config.client_id
+        # Old (pre-2.0) throttling behavior for client does not throttle upon receiving a
response with a non-zero throttle time.
+        if old_client_throttling_behavior:
+            client_version = V_1_1_0
+        else:
+            client_version = DEV_BRANCH
         # Produce all messages
         producer = ProducerPerformanceService(
             self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, record_size=self.record_size,
throughput=-1, client_id=producer_client_id)
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size,
+            client_id=producer_client_id, version=client_version)
@@ -144,7 +159,7 @@ class QuotaTest(Test):
         consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
             consumer_timeout_ms=60000, client_id=consumer_client_id,
% consumer_client_id],
-            jmx_attributes=['bytes-consumed-rate'])
+            jmx_attributes=['bytes-consumed-rate'], version=client_version)
         for idx, messages in consumer.messages_consumed.iteritems():

View raw message