kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: improve resilience of Streams test producers (#6028)
Date Fri, 04 Jan 2019 21:44:24 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef9204d  MINOR: improve resilience of Streams test producers (#6028)
ef9204d is described below

commit ef9204dc586038b97440485c81c218d438f22b9c
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Jan 4 15:44:15 2019 -0600

    MINOR: improve resilience of Streams test producers (#6028)
    
    Some Streams system tests have failed during the setup phase
    due to the producer having retries disabled and getting some
    transient error from the broker.
    
    This patch adds a retries parameter to the VerifiableProducer
    (default unchanged), and sets retries to 10 for Streams tests.
    
    It also sets acks equal to the number of brokers for Streams tests.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <guozhang@confluent.io>
---
 tests/kafkatest/services/verifiable_producer.py    | 9 +++++++--
 tests/kafkatest/tests/streams/base_streams_test.py | 5 +++--
 2 files changed, 10 insertions(+), 4 deletions(-)

diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index f339a62..744524e 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -58,7 +58,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
                  message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
                  stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO",
                  enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None,
-                 jaas_override_variables=None, kafka_opts_override="", client_prop_file_override=""):
+                 jaas_override_variables=None, kafka_opts_override="", client_prop_file_override="",
+                 retries=None):
         """
         Args:
             :param max_messages                number of messages to be produced per producer
@@ -102,7 +103,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
         self.jaas_override_variables = jaas_override_variables or {}
         self.kafka_opts_override = kafka_opts_override
         self.client_prop_file_override = client_prop_file_override
-
+        self.retries = retries
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -145,6 +146,10 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
             producer_prop_file += "\nmax.in.flight.requests.per.connection=5\n"
             producer_prop_file += "\nretries=1000000\n"
             producer_prop_file += "\nenable.idempotence=true\n"
+        elif self.retries is not None:
+            self.logger.info("VerifiableProducer (index = %d) will use retries = %s", idx,
self.retries)
+            producer_prop_file += "\nretries=%s\n" % self.retries
+            producer_prop_file += "\ndelivery.timeout.ms=%s\n" % (self.request_timeout_sec
* 1000 * self.retries)
 
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py
index 9a9704e..53e4231 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -44,9 +44,10 @@ class BaseStreamsTest(KafkaTest):
                                   self.kafka,
                                   topic,
                                   max_messages=num_messages,
-                                  acks=1,
+                                  acks=-1,
                                   throughput=throughput,
-                                  repeating_keys=repeating_keys)
+                                  repeating_keys=repeating_keys,
+                                  retries=10)
 
     def assert_produce_consume(self,
                                streams_source_topic,


Mime
View raw message