kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
Date Fri, 12 Jul 2019 17:02:58 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new d78fe37  KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
d78fe37 is described below

commit d78fe37ae0bfefeb2ad3105e6da6760d446b8737
Author: MichaƂ Borowiecki <mbor81@gmail.com>
AuthorDate: Fri Jul 12 16:27:33 2019 +0100

    KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
    
    Correct the Flatten SMT to properly handle null key or value `Struct` instances.
    
    Author: Michal Borowiecki <michal.borowiecki@openbet.com>
    Reviewers: Arjun Satish <arjun@confluent.io>, Robert Yokota <rayokota@gmail.com>,
Randall Hauch <rhauch@gmail.com>
---
 .../apache/kafka/connect/transforms/Flatten.java   | 29 ++++++++++------
 .../kafka/connect/transforms/FlattenTest.java      | 40 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 11 deletions(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index c5e4000..d7d2144 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class Flatten<R extends ConnectRecord<R>> implements Transformation<R>
{
 
@@ -136,20 +136,24 @@ public abstract class Flatten<R extends ConnectRecord<R>>
implements Transformat
     }
 
     private R applyWithSchema(R record) {
-        final Struct value = requireStruct(operatingValue(record), PURPOSE);
+        final Struct value = requireStructOrNull(operatingValue(record), PURPOSE);
 
-        Schema updatedSchema = schemaUpdateCache.get(value.schema());
+        Schema schema = operatingSchema(record);
+        Schema updatedSchema = schemaUpdateCache.get(schema);
         if (updatedSchema == null) {
-            final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
-            Struct defaultValue = (Struct) value.schema().defaultValue();
-            buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(),
defaultValue);
+            final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
+            Struct defaultValue = (Struct) schema.defaultValue();
+            buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue);
             updatedSchema = builder.build();
-            schemaUpdateCache.put(value.schema(), updatedSchema);
+            schemaUpdateCache.put(schema, updatedSchema);
+        }
+        if (value == null) {
+            return newRecord(record, updatedSchema, null);
+        } else {
+            final Struct updatedValue = new Struct(updatedSchema);
+            buildWithSchema(value, "", updatedValue);
+            return newRecord(record, updatedSchema, updatedValue);
         }
-
-        final Struct updatedValue = new Struct(updatedSchema);
-        buildWithSchema(value, "", updatedValue);
-        return newRecord(record, updatedSchema, updatedValue);
     }
 
     /**
@@ -216,6 +220,9 @@ public abstract class Flatten<R extends ConnectRecord<R>>
implements Transformat
     }
 
     private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord)
{
+        if (record == null) {
+            return;
+        }
         for (Field field : record.schema().fields()) {
             final String fieldName = fieldName(fieldNamePrefix, field.name());
             switch (field.schema().type()) {
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index d709054..430bba6 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -182,6 +182,46 @@ public class FlattenTest {
     }
 
     @Test
+    public void testOptionalStruct() {
+        xformValue.configure(Collections.<String, String>emptyMap());
+
+        SchemaBuilder builder = SchemaBuilder.struct().optional();
+        builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+        Schema schema = builder.build();
+
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
+            "topic", 0,
+            schema, null));
+
+        assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+        assertNull(transformed.value());
+    }
+
+    @Test
+    public void testOptionalNestedStruct() {
+        xformValue.configure(Collections.<String, String>emptyMap());
+
+        SchemaBuilder builder = SchemaBuilder.struct().optional();
+        builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+        Schema supportedTypesSchema = builder.build();
+
+        builder = SchemaBuilder.struct();
+        builder.field("B", supportedTypesSchema);
+        Schema oneLevelNestedSchema = builder.build();
+
+        Struct oneLevelNestedStruct = new Struct(oneLevelNestedSchema);
+        oneLevelNestedStruct.put("B", null);
+
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
+            "topic", 0,
+            oneLevelNestedSchema, oneLevelNestedStruct));
+
+        assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+        Struct transformedStruct = (Struct) transformed.value();
+        assertNull(transformedStruct.get("B.opt_int32"));
+    }
+
+    @Test
     public void testOptionalFieldMap() {
         xformValue.configure(Collections.<String, String>emptyMap());
 


Mime
View raw message