kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3673: Connect tests don't handle concurrent config changes
Date Mon, 09 May 2016 06:51:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 120baf978 -> 76fa376e2


KAFKA-3673: Connect tests don't handle concurrent config changes

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1340 from Ishiihara/connect-test-failure

(cherry picked from commit dbafc631ad78c96f85361a3d5e1c4d203cedb26f)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76fa376e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76fa376e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76fa376e

Branch: refs/heads/0.10.0
Commit: 76fa376e23e3442d4a57d239d696632d521279a6
Parents: 120baf9
Author: Liquan Pei <liquanpei@gmail.com>
Authored: Sun May 8 23:50:43 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Sun May 8 23:51:06 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/connect.py             | 38 ++++++++++----------
 .../tests/connect/connect_rest_test.py          | 25 ++++++-------
 tests/kafkatest/utils/util.py                   | 12 +++++++
 3 files changed, 45 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/76fa376e/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 1eb2dd5..cf67c30 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -22,6 +22,7 @@ import requests
 from ducktape.errors import DucktapeError
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
+from kafkatest.utils.util import retry_on_exception
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
@@ -102,31 +103,30 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def config_filenames(self):
         return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties")
for idx, template in enumerate(self.connector_config_templates or [])]
 
+    def list_connectors(self, node=None, retries=0, retry_backoff=.01):
+        return self._rest_with_retry('/connectors', node=node, retries=retries, retry_backoff=retry_backoff)
 
-    def list_connectors(self, node=None):
-        return self._rest('/connectors', node=node)
-
-    def create_connector(self, config, node=None):
+    def create_connector(self, config, node=None, retries=0, retry_backoff=.01):
         create_request = {
             'name': config['name'],
             'config': config
         }
-        return self._rest('/connectors', create_request, node=node, method="POST")
+        return self._rest_with_retry('/connectors', create_request, node=node, method="POST",
retries=retries, retry_backoff=retry_backoff)
 
-    def get_connector(self, name, node=None):
-        return self._rest('/connectors/' + name, node=node)
+    def get_connector(self, name, node=None, retries=0, retry_backoff=.01):
+        return self._rest_with_retry('/connectors/' + name, node=node, retries=retries, retry_backoff=retry_backoff)
 
-    def get_connector_config(self, name, node=None):
-        return self._rest('/connectors/' + name + '/config', node=node)
+    def get_connector_config(self, name, node=None, retries=0, retry_backoff=.01):
+        return self._rest_with_retry('/connectors/' + name + '/config', node=node, retries=retries,
retry_backoff=retry_backoff)
 
-    def set_connector_config(self, name, config, node=None):
-        return self._rest('/connectors/' + name + '/config', config, node=node, method="PUT")
+    def set_connector_config(self, name, config, node=None, retries=0, retry_backoff=.01):
+        return self._rest_with_retry('/connectors/' + name + '/config', config, node=node,
method="PUT", retries=retries, retry_backoff=retry_backoff)
 
-    def get_connector_tasks(self, name, node=None):
-        return self._rest('/connectors/' + name + '/tasks', node=node)
+    def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01):
+        return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, retries=retries,
retry_backoff=retry_backoff)
 
-    def delete_connector(self, name, node=None):
-        return self._rest('/connectors/' + name, node=node, method="DELETE")
+    def delete_connector(self, name, node=None, retries=0, retry_backoff=.01):
+        return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries,
retry_backoff=retry_backoff)
 
     def _rest(self, path, body=None, node=None, method="GET"):
         if node is None:
@@ -144,10 +144,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         else:
             return resp.json()
 
+    def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=0, retry_backoff=.01):
+        return retry_on_exception(lambda: self._rest(path, body, node, method), ConnectRestError,
retries, retry_backoff)
 
     def _base_url(self, node):
         return 'http://' + node.account.externally_routable_ip + ':' + '8083'
 
+
 class ConnectStandaloneService(ConnectServiceBase):
     """Runs Kafka Connect in standalone mode."""
 
@@ -223,8 +226,6 @@ class ConnectDistributedService(ConnectServiceBase):
             raise RuntimeError("No process ids recorded")
 
 
-
-
 class ConnectRestError(RuntimeError):
     def __init__(self, status, msg, url):
         self.status = status
@@ -235,7 +236,6 @@ class ConnectRestError(RuntimeError):
         return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url
+ ". Response: " + self.message
 
 
-
 class VerifiableConnector(object):
     def messages(self):
         """
@@ -261,6 +261,7 @@ class VerifiableConnector(object):
         self.logger.info("Destroying connector %s %s", type(self).__name__, self.name)
         self.cc.delete_connector(self.name)
 
+
 class VerifiableSource(VerifiableConnector):
     """
     Helper class for running a verifiable source connector on a Kafka Connect cluster and
analyzing the output.
@@ -284,6 +285,7 @@ class VerifiableSource(VerifiableConnector):
             'throughput': self.throughput
         })
 
+
 class VerifiableSink(VerifiableConnector):
     """
     Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing
the output.

http://git-wip-us.apache.org/repos/asf/kafka/blob/76fa376e/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 69a8cb7..63b9bb1 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -15,8 +15,12 @@
 
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
+from kafkatest.utils.util import retry_on_exception
 from ducktape.utils.util import wait_until
-import hashlib, subprocess, json, itertools
+import subprocess
+import json
+import itertools
+
 
 class ConnectRestApiTest(KafkaTest):
     """
@@ -65,10 +69,10 @@ class ConnectRestApiTest(KafkaTest):
         sink_connector_props = self.render("connect-file-sink.properties")
         for connector_props in [source_connector_props, sink_connector_props]:
             connector_config = self._config_dict_from_props(connector_props)
-            self.cc.create_connector(connector_config)
+            self.cc.create_connector(connector_config, retries=120, retry_backoff=1)
 
         # We should see the connectors appear
-        wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]),
+        wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source",
"local-file-sink"]),
                    timeout_sec=10, err_msg="Connectors that were just created did not appear
in connector listing")
 
         # We'll only do very simple validation that the connectors and tasks really ran.
@@ -76,7 +80,6 @@ class ConnectRestApiTest(KafkaTest):
             node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE)
         wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data
added to input file was not seen in the output file in a reasonable amount of time.")
 
-
         # Trying to create the same connector again should cause an error
         try:
             self.cc.create_connector(self._config_dict_from_props(source_connector_props))
@@ -97,19 +100,18 @@ class ConnectRestApiTest(KafkaTest):
         expected_sink_info = {
             'name': 'local-file-sink',
             'config': self._config_dict_from_props(sink_connector_props),
-            'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }]
+            'tasks': [{'connector': 'local-file-sink', 'task': 0 }]
         }
         sink_info = self.cc.get_connector("local-file-sink")
         assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info)
         sink_config = self.cc.get_connector_config("local-file-sink")
         assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config)
 
-
         # Validate that we can get info about tasks. This info should definitely be available
now without waiting since
         # we've already seen data appear in files.
         # TODO: It would be nice to validate a complete listing, but that doesn't make sense
for the file connectors
         expected_source_task_info = [{
-            'id': { 'connector': 'local-file-source', 'task': 0 },
+            'id': {'connector': 'local-file-source', 'task': 0},
             'config': {
                 'task.class': 'org.apache.kafka.connect.file.FileStreamSourceTask',
                 'file': self.INPUT_FILE,
@@ -119,7 +121,7 @@ class ConnectRestApiTest(KafkaTest):
         source_task_info = self.cc.get_connector_tasks("local-file-source")
         assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info)
         expected_sink_task_info = [{
-            'id': { 'connector': 'local-file-sink', 'task': 0 },
+            'id': {'connector': 'local-file-sink', 'task': 0},
             'config': {
                 'task.class': 'org.apache.kafka.connect.file.FileStreamSinkTask',
                 'file': self.OUTPUT_FILE,
@@ -139,9 +141,9 @@ class ConnectRestApiTest(KafkaTest):
             node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2)
         wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120,
err_msg="Data added to input file was not seen in the output file in a reasonable amount of
time.")
 
-        self.cc.delete_connector("local-file-source")
-        self.cc.delete_connector("local-file-sink")
-        wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted
connectors did not disappear from REST listing")
+        self.cc.delete_connector("local-file-source", retries=120, retry_backoff=1)
+        self.cc.delete_connector("local-file-sink", retries=120, retry_backoff=1)
+        wait_until(lambda: len(self.cc.list_connectors(retries=5, retry_backoff=1)) == 0,
timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
 
     def validate_output(self, input):
         input_set = set(input)
@@ -151,7 +153,6 @@ class ConnectRestApiTest(KafkaTest):
             ]))
         return input_set == output_set
 
-
     def file_contents(self, node, file):
         try:
             # Convert to a list here or the CalledProcessError may be returned during a call
to the generator instead of

http://git-wip-us.apache.org/repos/asf/kafka/blob/76fa376e/tests/kafkatest/utils/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index 0b10dbf..c043bec 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -15,6 +15,7 @@
 from kafkatest import __version__ as __kafkatest_version__
 
 import re
+import time
 
 
 def kafkatest_version():
@@ -71,3 +72,14 @@ def is_int_with_prefix(msg):
         raise Exception("Unexpected message format. Message should be of format: integer
"
                         "prefix dot integer value, but one of the two parts (before or after
dot) "
                         "are not integers. Message: %s" % (msg))
+
+
+def retry_on_exception(fun, exception, retries, retry_backoff=.01):
+    exception_to_throw = None
+    for i in range(0, retries + 1):
+        try:
+            return fun()
+        except exception as e:
+            exception_to_throw = e
+            time.sleep(retry_backoff)
+    raise exception_to_throw


Mime
View raw message