kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4037: Make Connect REST API retries aware of 409 CONFLICT errors
Date Thu, 18 Aug 2016 22:28:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 801a70612 -> a707f573d


KAFKA-4037: Make Connect REST API retries aware of 409 CONFLICT errors

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1733 from ewencp/rest-api-retries

(cherry picked from commit 59cfa84801c67de9729385a8f9b536721e0c37b9)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a707f573
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a707f573
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a707f573

Branch: refs/heads/0.10.0
Commit: a707f573de55a97782397f6cb5be90876e20cc3e
Parents: 801a706
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Thu Aug 18 15:29:56 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Aug 18 15:30:20 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/connect.py             | 62 +++++++++++++-------
 .../tests/connect/connect_rest_test.py          | 12 ++--
 tests/kafkatest/utils/util.py                   | 10 ----
 3 files changed, 46 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a707f573/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 7f36854..ebc19b0 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -17,12 +17,12 @@ import json
 import os.path
 import random
 import signal
+import time
 
 import requests
 from ducktape.errors import DucktapeError
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
-from kafkatest.utils.util import retry_on_exception
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
@@ -107,45 +107,49 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def config_filenames(self):
         return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties")
for idx, template in enumerate(self.connector_config_templates or [])]
 
-    def list_connectors(self, node=None, retries=0, retry_backoff=.01):
-        return self._rest_with_retry('/connectors', node=node, retries=retries, retry_backoff=retry_backoff)
+    def list_connectors(self, node=None, **kwargs):
+        return self._rest_with_retry('/connectors', node=node, **kwargs)
 
-    def create_connector(self, config, node=None, retries=0, retry_backoff=.01):
+    def create_connector(self, config, node=None, **kwargs):
         create_request = {
             'name': config['name'],
             'config': config
         }
-        return self._rest_with_retry('/connectors', create_request, node=node, method="POST",
retries=retries, retry_backoff=retry_backoff)
+        return self._rest_with_retry('/connectors', create_request, node=node, method="POST",
**kwargs)
 
-    def get_connector(self, name, node=None, retries=0, retry_backoff=.01):
-        return self._rest_with_retry('/connectors/' + name, node=node, retries=retries, retry_backoff=retry_backoff)
+    def get_connector(self, name, node=None, **kwargs):
+        return self._rest_with_retry('/connectors/' + name, node=node, **kwargs)
 
-    def get_connector_config(self, name, node=None, retries=0, retry_backoff=.01):
-        return self._rest_with_retry('/connectors/' + name + '/config', node=node, retries=retries,
retry_backoff=retry_backoff)
+    def get_connector_config(self, name, node=None, **kwargs):
+        return self._rest_with_retry('/connectors/' + name + '/config', node=node, **kwargs)
 
-    def set_connector_config(self, name, config, node=None, retries=0, retry_backoff=.01):
-        return self._rest_with_retry('/connectors/' + name + '/config', config, node=node,
method="PUT", retries=retries, retry_backoff=retry_backoff)
+    def set_connector_config(self, name, config, node=None, **kwargs):
+        # Unlike many other calls, a 409 when setting a connector config is expected if the
connector already exists.
+        # However, we also might see 409s for other reasons (e.g. rebalancing). So we still
perform retries at the cost
+        # of tests possibly taking longer to ultimately fail. Tests that care about this
can explicitly override the
+        # number of retries.
+        return self._rest_with_retry('/connectors/' + name + '/config', config, node=node,
method="PUT", **kwargs)
 
-    def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01):
-        return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, retries=retries,
retry_backoff=retry_backoff)
+    def get_connector_tasks(self, name, node=None, **kwargs):
+        return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, **kwargs)
 
-    def delete_connector(self, name, node=None, retries=0, retry_backoff=.01):
-        return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries,
retry_backoff=retry_backoff)
+    def delete_connector(self, name, node=None, **kwargs):
+        return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", **kwargs)
 
     def get_connector_status(self, name, node=None):
         return self._rest('/connectors/' + name + '/status', node=node)
 
-    def restart_connector(self, name, node=None):
-        return self._rest('/connectors/' + name + '/restart', method="POST")        
+    def restart_connector(self, name, node=None, **kwargs):
+        return self._rest_with_retry('/connectors/' + name + '/restart', node=node, method="POST",
**kwargs)
 
     def restart_task(self, connector_name, task_id, node=None):
-        return self._rest('/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart',
method="POST")        
+        return self._rest('/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart',
node=node, method="POST")
 
     def pause_connector(self, name, node=None):
-        return self._rest('/connectors/' + name + '/pause', method="PUT")
+        return self._rest('/connectors/' + name + '/pause', node=node, method="PUT")
 
     def resume_connector(self, name, node=None):
-        return self._rest('/connectors/' + name + '/resume', method="PUT")
+        return self._rest('/connectors/' + name + '/resume', node=node, method="PUT")
 
     def list_connector_plugins(self, node=None):
         return self._rest('/connector-plugins/', node=node)
@@ -163,14 +167,28 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         resp = meth(url, json=body)
         self.logger.debug("%s %s response: %d", url, method, resp.status_code)
         if resp.status_code > 400:
+            self.logger.debug("Connect REST API error for %s: %d %s", resp.url, resp.status_code,
resp.text)
             raise ConnectRestError(resp.status_code, resp.text, resp.url)
         if resp.status_code == 204 or resp.status_code == 202:
             return None
         else:
             return resp.json()
 
-    def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=0, retry_backoff=.01):
-        return retry_on_exception(lambda: self._rest(path, body, node, method), ConnectRestError,
retries, retry_backoff)
+    def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=40, retry_backoff=.25):
+        """
+        Invokes a REST API with retries for errors that may occur during normal operation
(notably 409 CONFLICT
+        responses that can occur due to rebalancing).
+        """
+        exception_to_throw = None
+        for i in range(0, retries + 1):
+            try:
+                return self._rest(path, body, node, method)
+            except ConnectRestError as e:
+                exception_to_throw = e
+                if e.status != 409:
+                    break
+                time.sleep(retry_backoff)
+        raise exception_to_throw
 
     def _base_url(self, node):
         return 'http://' + node.account.externally_routable_ip + ':' + '8083'

http://git-wip-us.apache.org/repos/asf/kafka/blob/a707f573/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index c32b8e1..0b00499 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -84,11 +84,11 @@ class ConnectRestApiTest(KafkaTest):
         self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, configs)
 
         self.logger.info("Creating connectors")
-        self.cc.create_connector(source_connector_config, retries=120, retry_backoff=1)
-        self.cc.create_connector(sink_connector_config, retries=120, retry_backoff=1)
+        self.cc.create_connector(source_connector_config)
+        self.cc.create_connector(sink_connector_config)
 
         # We should see the connectors appear
-        wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source",
"local-file-sink"]),
+        wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]),
                    timeout_sec=10, err_msg="Connectors that were just created did not appear
in connector listing")
 
         # We'll only do very simple validation that the connectors and tasks really ran.
@@ -157,9 +157,9 @@ class ConnectRestApiTest(KafkaTest):
             node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2)
         wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120,
err_msg="Data added to input file was not seen in the output file in a reasonable amount of
time.")
 
-        self.cc.delete_connector("local-file-source", retries=120, retry_backoff=1)
-        self.cc.delete_connector("local-file-sink", retries=120, retry_backoff=1)
-        wait_until(lambda: len(self.cc.list_connectors(retries=5, retry_backoff=1)) == 0,
timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
+        self.cc.delete_connector("local-file-source")
+        self.cc.delete_connector("local-file-sink")
+        wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted
connectors did not disappear from REST listing")
 
     def validate_output(self, input):
         input_set = set(input)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a707f573/tests/kafkatest/utils/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index c043bec..f004ece 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -73,13 +73,3 @@ def is_int_with_prefix(msg):
                         "prefix dot integer value, but one of the two parts (before or after
dot) "
                         "are not integers. Message: %s" % (msg))
 
-
-def retry_on_exception(fun, exception, retries, retry_backoff=.01):
-    exception_to_throw = None
-    for i in range(0, retries + 1):
-        try:
-            return fun()
-        except exception as e:
-            exception_to_throw = e
-            time.sleep(retry_backoff)
-    raise exception_to_throw


Mime
View raw message