kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10153: Error Reporting in Connect Documentation (#8858)
Date Wed, 01 Jul 2020 15:19:40 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new a677522  KAFKA-10153: Error Reporting in Connect Documentation (#8858)
a677522 is described below

commit a677522d0fc3fda37e410b0622e72bfe63ce25b9
Author: Aakash Shah <ashah@confluent.io>
AuthorDate: Wed Jul 1 08:18:16 2020 -0700

    KAFKA-10153: Error Reporting in Connect Documentation (#8858)
    
    Added a section about error reporting in Connect documentation, and another about how
to safely use the new errant record reporter in SinkTask implementations.
    
    Author: Aakash Shah <ashah@confluent.io>
    Reviewer: Randall Hauch <rhauch@gmail.com>
---
 docs/connect.html | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 78 insertions(+)

diff --git a/docs/connect.html b/docs/connect.html
index 18ab5fb..797c1fe 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -327,6 +327,48 @@
         <li><code>GET /</code>- return basic information about the Kafka
Connect cluster such as the version of the Connect worker that serves the REST request (including
git commit ID of the source code) and the Kafka cluster ID that is connected to.
     </ul>
 
+    <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reporting
in Connect</a></h4>
+
+    <p>Kafka Connect provides error reporting to handle errors encountered along various
stages of processing. By default, any error encountered during conversion or within transformations
will cause the connector to fail. Each connector configuration can also enable tolerating
such errors by skipping them, optionally writing each error and the details of the failed
operation and problematic record (with various levels of detail) to the Connect application
log. These mechanisms also capt [...]
+
+    <p>To report errors within a connector's converter, transforms, or within the sink
connector itself to the log, set <code>errors.log.enable=true</code> in the connector
configuration to log details of each error and problem record's topic, partition, and offset.
For additional debugging purposes, set <code>errors.log.include.messages=true</code>
to also log the problem record key, value, and headers to the log (note this may log sensitive
information).</p>
+
+    <p>To report errors within a connector's converter, transforms, or within the sink
connector itself to a dead letter queue topic, set <code>errors.deadletterqueue.topic.name</code>,
and optionally <code>errors.deadletterqueue.context.headers.enable=true</code>.</p>
+
+    <p>By default connectors exhibit "fail fast" behavior immediately upon an error
or exception. This is equivalent to adding the following configuration properties with their
defaults to a connector configuration:</p>
+
+    <pre class="brush: text;">
+        # disable retries on failure
+        errors.retry.timeout=0
+
+        # do not log the error and their contexts
+        errors.log.enable=false
+
+        # do not record errors in a dead letter queue topic
+        errors.deadletterqueue.topic.name=
+
+        # Fail on first error
+        errors.tolerance=none
+    </pre>
+
+    <p>These and other related connector configuration properties can be changed to
provide different behavior. For example, the following configuration properties can be added
to a connector configuration to setup error handling with multiple retries, logging to the
application logs and the <code>my-connector-errors</code> Kafka topic, and tolerating
all errors by reporting them rather than failing the connector task:</p>
+
+    <pre class="brush: text;">
+        # retry for at most 10 minutes times waiting up to 30 seconds between consecutive
failures
+        errors.retry.timeout=600000
+        errors.retry.delay.max.ms=30000
+
+        # log error context along with application logs, but do not include configs and messages
+        errors.log.enable=true
+        errors.log.include.messages=false
+
+        # produce error context into the Kafka topic
+        errors.deadletterqueue.topic.name=my-connector-errors
+
+        # Tolerate all errors.
+        errors.tolerance=all
+    </pre>
+
     <h3><a id="connect_development" href="#connect_development">8.3 Connector
Development Guide</a></h3>
 
     <p>This guide describes how developers can write new connectors for Kafka Connect
to move data between Kafka and other systems. It briefly reviews a few key concepts and then
describes how to create a simple connector.</p>
@@ -498,6 +540,42 @@
     <p>The <code>flush()</code> method is used during the offset commit
process, which allows tasks to recover from failures and resume from a safe point such that
no events will be missed. The method should push any outstanding data to the destination system
and then block until the write has been acknowledged. The <code>offsets</code>
parameter can often be ignored, but is useful in some cases where implementations want to
store offset information in the destination store to provide ex [...]
     delivery. For example, an HDFS connector could do this and use atomic move operations
to make sure the <code>flush()</code> operation atomically commits the data and
offsets to a final location in HDFS.</p>
 
+    <h5><a id="connect_errantrecordreporter" href="connect_errantrecordreporter">Errant
Record Reporter</a></h5>
+
+    <p>When <a href="#connect_errorreporting">error reporting</a> is enabled
for a connector, the connector can use an <code>ErrantRecordReporter</code> to
report problems with individual records sent to a sink connector. The following example shows
how a connector's <code>SinkTask</code> subclass might obtain and use the <code>ErrantRecordReporter</code>,
safely handling a null reporter when the DLQ is not enabled or when the connector is installed
in an older Connect runtime that doesn [...]
+
+    <pre class="brush: java;">
+        private ErrantRecordReporter reporter;
+
+        @Override
+        public void start(Map&lt;String, String&gt; props) {
+            ...
+            try {
+                reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
+            } catch (NoSuchMethodException | NoClassDefFoundError e) {
+                // Will occur in Connect runtimes earlier than 2.6
+                reporter = null;
+            }
+        }
+
+        @Override
+        public void put(Collection&lt;SinkRecord&gt; records) {
+            for (SinkRecord record: records) {
+                try {
+                    // attempt to process and send record to data sink
+                    process(record);
+                } catch(Exception e) {
+                    if (reporter != null) {
+                        // Send errant record to error reporter
+                        reporter.report(record, e);
+                    } else {
+                        // There's no error reporter, so fail
+                        throw new ConnectException("Failed on record", e);
+                    }
+                }
+            }
+        }
+    </pre>
 
     <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous
Offsets</a></h5>
 


Mime
View raw message