kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9024: Better error message when field specified does not exist (#7819)
Date Tue, 21 Jan 2020 20:37:04 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 2f52d63  KAFKA-9024: Better error message when field specified does not exist (#7819)
2f52d63 is described below

commit 2f52d6330aeb47921b0dc1a582e36597f88b71aa
Author: Nigel Liang <nigel@nigelliang.com>
AuthorDate: Tue Jan 21 12:25:35 2020 -0800

    KAFKA-9024: Better error message when field specified does not exist (#7819)
    
    Author: Nigel Liang <nigel@nigelliang.com>
    Reviewer: Randall Hauch <rhauch@gmail.com>
---
 .../apache/kafka/connect/transforms/ValueToKey.java    |  9 +++++++--
 .../kafka/connect/transforms/ValueToKeyTest.java       | 18 ++++++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
index a9d9601..b2226d3 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
@@ -21,9 +21,11 @@ import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
 
@@ -82,8 +84,11 @@ public class ValueToKey<R extends ConnectRecord<R>> implements
Transformation<R>
         if (keySchema == null) {
             final SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
             for (String field : fields) {
-                final Schema fieldSchema = value.schema().field(field).schema();
-                keySchemaBuilder.field(field, fieldSchema);
+                final Field fieldFromValue = value.schema().field(field);
+                if (fieldFromValue == null) {
+                    throw new DataException("Field does not exist: " + field);
+                }
+                keySchemaBuilder.field(field, fieldFromValue.schema());
             }
             keySchema = keySchemaBuilder.build();
             valueToKeySchemaCache.put(value.schema(), keySchema);
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index e2dfa17..5854658 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.transforms;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.After;
 import org.junit.Test;
@@ -28,6 +29,7 @@ import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 
 public class ValueToKeyTest {
     private final ValueToKey<SinkRecord> xform = new ValueToKey<>();
@@ -88,4 +90,20 @@ public class ValueToKeyTest {
         assertEquals(expectedKey, transformedRecord.key());
     }
 
+    @Test
+    public void nonExistingField() {
+        xform.configure(Collections.singletonMap("fields", "not_exist"));
+
+        final Schema valueSchema = SchemaBuilder.struct()
+            .field("a", Schema.INT32_SCHEMA)
+            .build();
+
+        final Struct value = new Struct(valueSchema);
+        value.put("a", 1);
+
+        final SinkRecord record = new SinkRecord("", 0, null, null, valueSchema, value, 0);
+
+        DataException actual = assertThrows(DataException.class, () -> xform.apply(record));
+        assertEquals("Field does not exist: not_exist", actual.getMessage());
+    }
 }


Mime
View raw message