kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5164: Ensure SetSchemaMetadata updates key or value when Schema changes
Date Fri, 02 Jun 2017 17:02:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f85c18032 -> af85e05b9


KAFKA-5164: Ensure SetSchemaMetadata updates key or value when Schema changes

When the `SetSchemaMetadata` SMT is used to change the name and/or version of the key or value’s
schema, any references to the old schema in the key or value must be changed to reference
the new schema. Only keys or values that are `Struct` have such references, and so currently
only these are adjusted.

This is based on `trunk` since the fix is expected to be targeted to the 0.11.1 release.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3198 from rhauch/kafka-5164


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af85e05b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af85e05b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af85e05b

Branch: refs/heads/trunk
Commit: af85e05b98a41cd5f8ac45a853b2ddd28463c084
Parents: f85c180
Author: Randall Hauch <rhauch@gmail.com>
Authored: Fri Jun 2 10:02:40 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Jun 2 10:02:40 2017 -0700

----------------------------------------------------------------------
 .../connect/transforms/SetSchemaMetadata.java   | 33 +++++++-
 .../transforms/SetSchemaMetadataTest.java       | 80 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/af85e05b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
index bb581de..901ac9f 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.ConnectSchema;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
 
 import java.util.Map;
@@ -101,7 +103,8 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>>
implements T
 
         @Override
         protected R newRecord(R record, Schema updatedSchema) {
-            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema,
record.key(), record.valueSchema(), record.value(), record.timestamp());
+            Object updatedKey = updateSchemaIn(record.key(), updatedSchema);
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema,
updatedKey, record.valueSchema(), record.value(), record.timestamp());
         }
     }
 
@@ -116,8 +119,34 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>>
implements T
 
         @Override
         protected R newRecord(R record, Schema updatedSchema) {
-            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(), updatedSchema, record.value(), record.timestamp());
+            Object updatedValue = updateSchemaIn(record.value(), updatedSchema);
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(), updatedSchema, updatedValue, record.timestamp());
         }
     }
 
+    /**
+     * Utility to check the supplied key or value for references to the old Schema,
+     * and if so to return an updated key or value object that references the new Schema.
+     * Note that this method assumes that the new Schema may have a different name and/or
version,
+     * but has fields that exactly match those of the old Schema.
+     * <p>
+     * Currently only {@link Struct} objects have references to the {@link Schema}.
+     *
+     * @param keyOrValue    the key or value object; may be null
+     * @param updatedSchema the updated schema that has been potentially renamed
+     * @return the original key or value object if it does not reference the old schema,
or
+     * a copy of the key or value object with updated references to the new schema.
+     */
+    protected static Object updateSchemaIn(Object keyOrValue, Schema updatedSchema) {
+        if (keyOrValue instanceof Struct) {
+            Struct origStruct = (Struct) keyOrValue;
+            Struct newStruct = new Struct(updatedSchema);
+            for (Field field : updatedSchema.fields()) {
+                // assume both schemas have exact same fields with same names and schemas
...
+                newStruct.put(field, origStruct.get(field));
+            }
+            return newStruct;
+        }
+        return keyOrValue;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/af85e05b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
index b2b14db..206c51e 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.kafka.connect.transforms;
 
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Test;
 
@@ -25,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 
 public class SetSchemaMetadataTest {
 
@@ -63,4 +67,80 @@ public class SetSchemaMetadataTest {
         assertEquals(new Integer(42), updatedRecord.valueSchema().version());
     }
 
+    @Test
+    public void schemaNameAndVersionUpdateWithStruct() {
+        final String fieldName1 = "f1";
+        final String fieldName2 = "f2";
+        final String fieldValue1 = "value1";
+        final int fieldValue2 = 1;
+        final Schema schema = SchemaBuilder.struct()
+                                      .name("my.orig.SchemaDefn")
+                                      .field(fieldName1, Schema.STRING_SCHEMA)
+                                      .field(fieldName2, Schema.INT32_SCHEMA)
+                                      .build();
+        final Struct value = new Struct(schema).put(fieldName1, fieldValue1).put(fieldName2,
fieldValue2);
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("schema.name", "foo");
+        props.put("schema.version", "42");
+        final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
+        xform.configure(props);
+
+        final SinkRecord record = new SinkRecord("", 0, null, null, schema, value, 0);
+
+        final SinkRecord updatedRecord = xform.apply(record);
+
+        assertEquals("foo", updatedRecord.valueSchema().name());
+        assertEquals(new Integer(42), updatedRecord.valueSchema().version());
+
+        // Make sure the struct's schema and fields all point to the new schema
+        assertMatchingSchema((Struct) updatedRecord.value(), updatedRecord.valueSchema());
+    }
+
+    @Test
+    public void updateSchemaOfStruct() {
+        final String fieldName1 = "f1";
+        final String fieldName2 = "f2";
+        final String fieldValue1 = "value1";
+        final int fieldValue2 = 1;
+        final Schema schema = SchemaBuilder.struct()
+                                      .name("my.orig.SchemaDefn")
+                                      .field(fieldName1, Schema.STRING_SCHEMA)
+                                      .field(fieldName2, Schema.INT32_SCHEMA)
+                                      .build();
+        final Struct value = new Struct(schema).put(fieldName1, fieldValue1).put(fieldName2,
fieldValue2);
+
+        final Schema newSchema = SchemaBuilder.struct()
+                                      .name("my.updated.SchemaDefn")
+                                      .field(fieldName1, Schema.STRING_SCHEMA)
+                                      .field(fieldName2, Schema.INT32_SCHEMA)
+                                      .build();
+
+        Struct newValue = (Struct) SetSchemaMetadata.updateSchemaIn(value, newSchema);
+        assertMatchingSchema(newValue, newSchema);
+    }
+
+    @Test
+    public void updateSchemaOfNonStruct() {
+        Object value = new Integer(1);
+        Object updatedValue = SetSchemaMetadata.updateSchemaIn(value, Schema.INT32_SCHEMA);
+        assertSame(value, updatedValue);
+    }
+
+    @Test
+    public void updateSchemaOfNull() {
+        Object updatedValue = SetSchemaMetadata.updateSchemaIn(null, Schema.INT32_SCHEMA);
+        assertEquals(null, updatedValue);
+    }
+
+    protected void assertMatchingSchema(Struct value, Schema schema) {
+        assertSame(schema, value.schema());
+        assertEquals(schema.name(), value.schema().name());
+        for (Field field : schema.fields()) {
+            String fieldName = field.name();
+            assertEquals(schema.field(fieldName).name(), value.schema().field(fieldName).name());
+            assertEquals(schema.field(fieldName).index(), value.schema().field(fieldName).index());
+            assertSame(schema.field(fieldName).schema(), value.schema().field(fieldName).schema());
+        }
+    }
 }


Mime
View raw message