kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: System test for error handling and writes to DeadLetterQueue
Date Tue, 07 Aug 2018 21:44:28 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new ca2589c  MINOR: System test for error handling and writes to DeadLetterQueue
ca2589c is described below

commit ca2589cc7ff60c48ab7492e4e8cd22e78bda9acb
Author: Arjun Satish <arjun@confluent.io>
AuthorDate: Tue Aug 7 14:44:01 2018 -0700

    MINOR: System test for error handling and writes to DeadLetterQueue
    
    Added a system test which creates a file sink with json converter and attempts to feed
it bad records. The bad records should land in the DLQ if it is enabled, and the task should
be killed or bad records skipped based on test parameters.
    
    Signed-off-by: Arjun Satish <arjunconfluent.io>
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Arjun Satish <arjun@confluent.io>
    
    Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava
<ewen@confluent.io>
    
    Closes #5456 from wicknicks/error-handling-sys-test
    
    (cherry picked from commit 28a1ae4183c707af363b69e2ec2b743bdf4f236c)
    Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
---
 tests/kafkatest/services/connect.py                |  5 ++
 tests/kafkatest/tests/connect/connect_test.py      | 72 ++++++++++++++++++++++
 .../connect/templates/connect-file-sink.properties | 18 +++++-
 .../templates/connect-file-source.properties       |  7 +++
 4 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index d7ef204..19beddd 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -326,6 +326,11 @@ class ConnectDistributedService(ConnectServiceBase):
             raise RuntimeError("No process ids recorded")
 
 
+class ErrorTolerance(object):
+    ALL = "all"
+    NONE = "none"
+
+
 class ConnectRestError(RuntimeError):
     def __init__(self, status, msg, url):
         self.status = status
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 9d34c48..c961681 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -18,10 +18,12 @@ from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from ducktape.mark import parametrize, matrix
 from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.errors import TimeoutError
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.connect import ConnectStandaloneService
+from kafkatest.services.connect import ErrorTolerance
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 
@@ -134,3 +136,73 @@ class ConnectStandaloneFileTest(Test):
             return output_hash == hashlib.md5(value).hexdigest()
         except RemoteCommandError:
             return False
+
+    @cluster(num_nodes=5)
+    @parametrize(error_tolerance=ErrorTolerance.ALL)
+    @parametrize(error_tolerance=ErrorTolerance.NONE)
+    def test_skip_and_log_to_dlq(self, error_tolerance):
+        self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, topics=self.topics)
+
+        # set config props
+        self.override_error_tolerance_props = error_tolerance
+        self.enable_deadletterqueue = True
+
+        successful_records = []
+        faulty_records = []
+        records = []
+        for i in range(0, 1000):
+            if i % 2 == 0:
+                records.append('{"some_key":' + str(i) + '}')
+                successful_records.append('{some_key=' + str(i) + '}')
+            else:
+                # badly formatted json records (missing a quote after the key)
+                records.append('{"some_key:' + str(i) + '}')
+                faulty_records.append('{"some_key:' + str(i) + '}')
+
+        records = "\n".join(records) + "\n"
+        successful_records = "\n".join(successful_records) + "\n"
+        if error_tolerance == ErrorTolerance.ALL:
+            faulty_records = ",".join(faulty_records)
+        else:
+            faulty_records = faulty_records[0]
+
+        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE,
self.OFFSETS_FILE])
+        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE,
self.OFFSETS_FILE])
+
+        self.zk.start()
+        self.kafka.start()
+
+        self.override_key_converter = "org.apache.kafka.connect.storage.StringConverter"
+        self.override_value_converter = "org.apache.kafka.connect.storage.StringConverter"
+        self.source.set_configs(lambda node: self.render("connect-standalone.properties",
node=node), [self.render("connect-file-source.properties")])
+
+        self.override_key_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.override_value_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.override_key_converter_schemas_enable = False
+        self.override_value_converter_schemas_enable = False
+        self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node),
[self.render("connect-file-sink.properties")])
+
+        self.source.start()
+        self.sink.start()
+
+        # Generating data on the source node should generate new records and create new output
on the sink node
+        self.source.node.account.ssh("echo -e -n " + repr(records) + " >> " + self.INPUT_FILE)
+
+        if error_tolerance == ErrorTolerance.NONE:
+            try:
+                wait_until(lambda: self.validate_output(successful_records), timeout_sec=15,
+                           err_msg="Clean records added to input file were not seen in the
output file in a reasonable amount of time.")
+                raise Exception("Expected to not find any results in this file.")
+            except TimeoutError:
+                self.logger.info("Caught expected exception")
+        else:
+            wait_until(lambda: self.validate_output(successful_records), timeout_sec=15,
+                       err_msg="Clean records added to input file were not seen in the output
file in a reasonable amount of time.")
+
+        if self.enable_deadletterqueue:
+            self.logger.info("Reading records from deadletterqueue")
+            consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, "my-connector-errors",
+                                                 consumer_timeout_ms=10000)
+            consumer_validator.run()
+            actual = ",".join(consumer_validator.messages_consumed[1])
+            assert faulty_records == actual, "Expected %s but saw %s in dead letter queue"
% (faulty_records, actual)
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
index bff002b..a58cc6b 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
@@ -25,4 +25,20 @@ key.converter={{ override_key_converter }}
 {% endif %}
 {% if override_key_converter is defined %}
 value.converter={{ override_value_converter }}
-{% endif %}
\ No newline at end of file
+{% endif %}
+
+key.converter.schemas.enable={{ override_key_converter_schemas_enable|default(True) }}
+value.converter.schemas.enable={{ override_value_converter_schemas_enable|default(True) }}
+
+# log error context along with application logs
+errors.log.enable=true
+errors.log.include.messages=true
+
+{% if enable_deadletterqueue is defined %}
+# produce error context into the Kafka topic
+errors.deadletterqueue.topic.name={{ override_deadletterqueue_topic_name|default("my-connector-errors")
}}
+errors.deadletterqueue.topic.replication.factor={{ override_deadletterqueue_replication_factor|default(1)
}}
+{% endif %}
+
+# Tolerate all errors.
+errors.tolerance={{ override_error_tolerance_props|default("none") }}
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-source.properties b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
index 800d6a0..147e85a 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-source.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
@@ -26,3 +26,10 @@ key.converter={{ override_key_converter }}
 {% if override_key_converter is defined %}
 value.converter={{ override_value_converter }}
 {% endif %}
+
+# log error context along with application logs
+errors.log.enable=true
+errors.log.include.messages=true
+
+# Tolerate all errors.
+errors.tolerance={{ override_error_tolerance_props|default("none") }}


Mime
View raw message