kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-8523 Enabling InsertField transform to be used with tombstone events (#6914)
Date Thu, 03 Oct 2019 19:24:29 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 87736fe  KAFKA-8523 Enabling InsertField transform to be used with tombstone events
(#6914)
87736fe is described below

commit 87736feef7c9f7b38528609550488393408999a6
Author: Gunnar Morling <gunnar.morling@googlemail.com>
AuthorDate: Thu Oct 3 20:51:00 2019 +0200

    KAFKA-8523 Enabling InsertField transform to be used with tombstone events (#6914)
    
    * KAFKA-8523 Avoiding raw type usage
    
    * KAFKA-8523 Gracefully handling tombstone events in InsertField SMT
---
 .../kafka/connect/transforms/InsertField.java      |  8 +++-
 .../kafka/connect/transforms/InsertFieldTest.java  | 52 +++++++++++++++++++---
 2 files changed, 54 insertions(+), 6 deletions(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
index 5e472a9..93ba79c 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -127,13 +127,19 @@ public abstract class InsertField<R extends ConnectRecord<R>>
implements Transfo
 
     @Override
     public R apply(R record) {
-        if (operatingSchema(record) == null) {
+        if (isTombstoneRecord(record)) {
+            return record;
+        } else if (operatingSchema(record) == null) {
             return applySchemaless(record);
         } else {
             return applyWithSchema(record);
         }
     }
 
+    private boolean isTombstoneRecord(R record) {
+        return record.value() == null;
+    }
+
     private R applySchemaless(R record) {
         final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
 
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index a0a0975..b22872c 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -104,11 +104,53 @@ public class InsertFieldTest {
 
         final SourceRecord transformedRecord = xform.apply(record);
 
-        assertEquals(42L, ((Map) transformedRecord.value()).get("magic"));
-        assertEquals("test", ((Map) transformedRecord.value()).get("topic_field"));
-        assertEquals(0, ((Map) transformedRecord.value()).get("partition_field"));
-        assertEquals(null, ((Map) transformedRecord.value()).get("timestamp_field"));
-        assertEquals("my-instance-id", ((Map) transformedRecord.value()).get("instance_id"));
+        assertEquals(42L, ((Map<?, ?>) transformedRecord.value()).get("magic"));
+        assertEquals("test", ((Map<?, ?>) transformedRecord.value()).get("topic_field"));
+        assertEquals(0, ((Map<?, ?>) transformedRecord.value()).get("partition_field"));
+        assertEquals(null, ((Map<?, ?>) transformedRecord.value()).get("timestamp_field"));
+        assertEquals("my-instance-id", ((Map<?, ?>) transformedRecord.value()).get("instance_id"));
     }
 
+
+    @Test
+    public void insertConfiguredFieldsIntoTombstoneEventWithoutSchemaLeavesValueUnchanged()
{
+        final Map<String, Object> props = new HashMap<>();
+        props.put("topic.field", "topic_field!");
+        props.put("partition.field", "partition_field");
+        props.put("timestamp.field", "timestamp_field?");
+        props.put("static.field", "instance_id");
+        props.put("static.value", "my-instance-id");
+
+        xform.configure(props);
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0,
+                null, null);
+
+        final SourceRecord transformedRecord = xform.apply(record);
+
+        assertEquals(null, transformedRecord.value());
+        assertEquals(null, transformedRecord.valueSchema());
+    }
+
+    @Test
+    public void insertConfiguredFieldsIntoTombstoneEventWithSchemaLeavesValueUnchanged()
{
+        final Map<String, Object> props = new HashMap<>();
+        props.put("topic.field", "topic_field!");
+        props.put("partition.field", "partition_field");
+        props.put("timestamp.field", "timestamp_field?");
+        props.put("static.field", "instance_id");
+        props.put("static.value", "my-instance-id");
+
+        xform.configure(props);
+
+        final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic",
Schema.OPTIONAL_INT64_SCHEMA).build();
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0,
+                simpleStructSchema, null);
+
+        final SourceRecord transformedRecord = xform.apply(record);
+
+        assertEquals(null, transformedRecord.value());
+        assertEquals(simpleStructSchema, transformedRecord.valueSchema());
+    }
 }


Mime
View raw message