kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9018: Throw clearer exceptions on serialisation errors (#7496)
Date Wed, 01 Jul 2020 19:03:49 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new efec315  KAFKA-9018: Throw clearer exceptions on serialisation errors (#7496)
efec315 is described below

commit efec315d0a79ef281f337d328cb51667814fa5b3
Author: Mario Molina <mmolimar@gmail.com>
AuthorDate: Wed Jul 1 14:03:11 2020 -0500

    KAFKA-9018: Throw clearer exceptions on serialisation errors (#7496)
    
    Improved the exception messages that are thrown to indicate whether it was a key or value
conversion problem.
    
    Author: Mario Molina <mmolimar@gmail.com>
    Reviewer: Randall Hauch <rhauch@gmail.com>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      | 24 ++++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 11318fd..53dc254 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -489,10 +489,10 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]>
msg) {
-        SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(() -> keyConverter.toConnectData(msg.topic(),
msg.headers(), msg.key()),
+        SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(() -> convertKey(msg),
                 Stage.KEY_CONVERTER, keyConverter.getClass());
 
-        SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(() -> valueConverter.toConnectData(msg.topic(),
msg.headers(), msg.value()),
+        SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(() -> convertValue(msg),
                 Stage.VALUE_CONVERTER, valueConverter.getClass());
 
         Headers headers = retryWithToleranceOperator.execute(() -> convertHeadersFor(msg),
Stage.HEADER_CONVERTER, headerConverter.getClass());
@@ -524,6 +524,26 @@ class WorkerSinkTask extends WorkerTask {
         return new InternalSinkRecord(msg, transformedRecord);
     }
 
+    private SchemaAndValue convertKey(ConsumerRecord<byte[], byte[]> msg) {
+        try {
+            return keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key());
+        } catch (Exception e) {
+            log.error("{} Error converting message key in topic '{}' partition {} at offset
{} and timestamp {}: {}",
+                    this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage(),
e);
+            throw e;
+        }
+    }
+
+    private SchemaAndValue convertValue(ConsumerRecord<byte[], byte[]> msg) {
+        try {
+            return valueConverter.toConnectData(msg.topic(), msg.headers(), msg.value());
+        } catch (Exception e) {
+            log.error("{} Error converting message value in topic '{}' partition {} at offset
{} and timestamp {}: {}",
+                    this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage(),
e);
+            throw e;
+        }
+    }
+
     private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
         Headers result = new ConnectHeaders();
         org.apache.kafka.common.header.Headers recordHeaders = record.headers();


Mime
View raw message