kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.1 updated: MINOR: Modify Connect service's startup timeout to be passed via the init (#5882)
Date Tue, 06 Nov 2018 21:56:44 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 4074255  MINOR: Modify Connect service's startup timeout to be passed via the init
(#5882)
4074255 is described below

commit 4074255da24a54923f0e2c3c783f5e8fedfdaa66
Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>
AuthorDate: Tue Nov 6 13:41:19 2018 -0800

    MINOR: Modify Connect service's startup timeout to be passed via the init (#5882)
    
    Currently, the startup timeout is hardcoded to be 60 seconds in Connect's test service.
Modifying it to be passable via init. This can safely be backported as well.
    
    Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 tests/kafkatest/services/connect.py | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index d8c8d5a..bf38e50 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -63,12 +63,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, files):
+    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
         super(ConnectServiceBase, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
         self.files = files
         self.startup_mode = self.STARTUP_MODE_LISTEN
+        self.startup_timeout_sec = startup_timeout_sec
         self.environment = {}
         self.external_config_template_func = None
 
@@ -122,13 +123,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def start_and_wait_to_load_plugins(self, node, worker_type, remote_connector_configs):
         with node.account.monitor_log(self.LOG_FILE) as monitor:
             self.start_and_return_immediately(node, worker_type, remote_connector_configs)
-            monitor.wait_until('Kafka version', timeout_sec=60,
+            monitor.wait_until('Kafka version', timeout_sec=self.startup_timeout_sec,
                                err_msg="Never saw message indicating Kafka Connect finished
startup on node: " +
                                        "%s in condition mode: %s" % (str(node.account), self.startup_mode))
 
     def start_and_wait_to_start_listening(self, node, worker_type, remote_connector_configs):
         self.start_and_return_immediately(node, worker_type, remote_connector_configs)
-        wait_until(lambda: self.listening(node), timeout_sec=60,
+        wait_until(lambda: self.listening(node), timeout_sec=self.startup_timeout_sec,
                    err_msg="Kafka Connect failed to start on node: %s in condition mode:
%s" %
                    (str(node.account), self.startup_mode))
 
@@ -141,7 +142,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             node.account.signal(pid, sig, allow_fail=True)
         if clean_shutdown:
             for pid in pids:
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka
Connect process on " + str(node.account) + " took too long to exit")
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=self.startup_timeout_sec,
err_msg="Kafka Connect process on " + str(
+                    node.account) + " took too long to exit")
 
         node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
 
@@ -254,8 +256,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
 class ConnectStandaloneService(ConnectServiceBase):
     """Runs Kafka Connect in standalone mode."""
 
-    def __init__(self, context, kafka, files):
-        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files)
+    def __init__(self, context, kafka, files, startup_timeout_sec = 60):
+        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)
 
     # For convenience since this service only makes sense with a single node
     @property
@@ -303,8 +305,8 @@ class ConnectDistributedService(ConnectServiceBase):
     """Runs Kafka Connect in distributed mode."""
 
     def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
-                 configs_topic="connect-configs", status_topic="connect-status"):
-        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
+                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec
= 60):
+        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files,
startup_timeout_sec)
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
         self.status_topic = status_topic


Mime
View raw message