kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: provide an example for deserialization exception handler (#5231)
Date Mon, 18 Jun 2018 00:32:09 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new bc2badb  MINOR: provide an example for deserialization exception handler (#5231)
bc2badb is described below

commit bc2badb2483e69a3f3c24999da8275d3bd3dfa70
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sun Jun 17 17:31:30 2018 -0700

    MINOR: provide an example for deserialization exception handler (#5231)
    
    Also added a paragraph from data types to link to the example code.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>
---
 docs/streams/developer-guide/config-streams.html | 44 ++++++++++++++++++++++--
 docs/streams/developer-guide/datatypes.html      |  5 +++
 2 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 6bba10d..2c6bce1 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -300,8 +300,10 @@
           <span id="streams-developer-guide-deh"></span><h4><a class="toc-backref"
href="#id7">default.deserialization.exception.handler</a><a class="headerlink"
href="#default-deserialization-exception-handler" title="Permalink to this headline"></a></h4>
           <blockquote>
             <div><p>The default deserialization exception handler allows you
to manage record exceptions that fail to deserialize. This
-              can be caused by corrupt data, incorrect serialization logic, or unhandled
record types. These exception handlers
-              are available:</p>
+              can be caused by corrupt data, incorrect serialization logic, or unhandled
record types. The implemented exception
+              handler needs to return a <code>FAIL</code> or <code>CONTINUE</code>
depending on the record and the exception thrown. Returning
+              <code>FAIL</code> will signal that Streams should shut down and
<code>CONTINUE</code> will signal that Streams should ignore the issue
+              and continue processing. The following library built-in exception handlers
are available:</p>
               <ul class="simple">
                 <li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html">LogAndContinueExceptionHandler</a>:
                   This handler logs the deserialization exception and then signals the processing
pipeline to continue processing more records.
@@ -310,6 +312,42 @@
                 <li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html">LogAndFailExceptionHandler</a>.
                   This handler logs the deserialization exception and then signals the processing
pipeline to stop processing more records.</li>
               </ul>
+
+              <p>You can also provide your own customized exception handler besides
the library provided ones to meet your needs. For example, you can choose to forward corrupt
+                records into a quarantine topic (think: a "dead letter queue") for further
processing. To do this, use the Producer API to write a corrupted record directly to
+                the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code>
object outside the Streams client, and pass in this object
+                as well as the dead letter queue topic name into the <code>Properties</code>
map, which then can be retrieved from the <code>configure</code> function call.
+                The drawback of this approach is that "manual" writes are side effects that
are invisible to the Kafka Streams runtime library,
+                so they do not benefit from the end-to-end processing guarantees of the Streams
API:</p>
+
+              <pre class="brush: java;">
+              public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler
{
+                  KafkaProducer&lt;byte[], byte[]&gt; dlqProducer;
+                  String dlqTopic;
+
+                  @Override
+                  public DeserializationHandlerResponse handle(final ProcessorContext context,
+                                                               final ConsumerRecord&lt;byte[],
byte[]&gt; record,
+                                                               final Exception exception)
{
+
+                      log.warn("Exception caught during Deserialization, sending to the dead
queue topic; " +
+                          "taskId: {}, topic: {}, partition: {}, offset: {}",
+                          context.taskId(), record.topic(), record.partition(), record.offset(),
+                          exception);
+
+                      dlqProducer.send(new ProducerRecord&lt;&gt;(dlqTopic, record.timestamp(),
record.key(), record.value(), record.headers())).get();
+
+                      return DeserializationHandlerResponse.CONTINUE;
+                  }
+
+                  @Override
+                  public void configure(final Map&lt;String, ?&gt; configs) {
+                      dlqProducer = .. // get a producer from the configs map
+                      dlqTopic = .. // get the topic name from the configs map
+                  }
+              }
+              </pre>
+
             </div></blockquote>
         </div>
         <div class="section" id="default-production-exception-handler">
@@ -329,7 +367,7 @@
             import org.apache.kafka.streams.errors.ProductionExceptionHandler;
             import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 
-            class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
+            public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler
{
                 public void configure(Map&lt;String, Object&gt; config) {}
 
                 public ProductionExceptionHandlerResponse handle(final ProducerRecord&lt;byte[],
byte[]&gt; record,
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index 1120815..a24dc4c 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -93,6 +93,11 @@
 <span class="n">userCountByRegion</span><span class="o">.</span><span
class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span
class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span
class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span
class="o">.</span><span class="na">Long</span><span class="o">()));</span>
 </pre></div>
       </div>
+      <p>If some of your incoming records are corrupted or ill-formatted, they will
cause the deserializer class to report an error.
+         Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code>
interface which allows
+         you to customize how to handle such records. The customized implementation of the
interface can be specified via the <code>StreamsConfig</code>.
+         For more details, please feel free to read the <a href="config-streams.html#default-deserialization-exception-handler">Configuring
a Streams Application</a> section.
+      </p>
     </div>
     <div class="section" id="available-serdes">
       <span id="streams-developer-guide-serdes-available"></span><h2>Available
SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2>

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message