kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] 02/02: KAFKA-6290: Support casting from logical types in cast transform (#7371)
Date Thu, 03 Oct 2019 20:53:48 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 81db9de2d8c6bde8598e942c456f3950ddde2278
Author: Nigel Liang <nigel@nigelliang.com>
AuthorDate: Thu Oct 3 12:55:52 2019 -0700

    KAFKA-6290: Support casting from logical types in cast transform (#7371)
    
    Adds support for the Connect Cast transforms to cast from Connect logical types, such
as DATE, TIME, TIMESTAMP, and DECIMAL. Casting to numeric types will produce the underlying
numeric value represented in the desired type. For logical types represented by underlying
Java Date class, this means the milliseconds since EPOCH. For Decimal, this means the underlying
value. If the value does not fit in the desired target type, it may overflow.
    
    Casting to String from Date, Time, and Timestamp types will produce their ISO 8601 representation.
Casting to String from Decimal will result in the value represented as a string. e.g. 1234
-> "1234".
    
    Author: Nigel Liang <nigel@nigelliang.com>
    Reviewer: Randall Hauch <rhauch@gmail.com>
---
 .../org/apache/kafka/connect/transforms/Cast.java  |  23 +++++
 .../apache/kafka/connect/transforms/CastTest.java  | 101 ++++++++++++++++++++-
 2 files changed, 122 insertions(+), 2 deletions(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 1463a68..9dcec15 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -24,14 +24,20 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.ConnectSchema;
+import org.apache.kafka.connect.data.Date;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.transforms.util.SchemaUtil;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -44,6 +50,7 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
 
 public abstract class Cast<R extends ConnectRecord<R>> implements Transformation<R>
{
+    private static final Logger log = LoggerFactory.getLogger(Cast.class);
 
     // TODO: Currently we only support top-level field casting. Ideally we could use a dotted
notation in the spec to
     // allow casting nested fields.
@@ -220,7 +227,18 @@ public abstract class Cast<R extends ConnectRecord<R>> implements
Transformation
             default:
                 throw new DataException("Unexpected type in Cast transformation: " + type);
         }
+    }
 
+    private static Object encodeLogicalType(Schema schema, Object value) {
+        switch (schema.name()) {
+            case Date.LOGICAL_NAME:
+                return Date.fromLogical(schema, (java.util.Date) value);
+            case Time.LOGICAL_NAME:
+                return Time.fromLogical(schema, (java.util.Date) value);
+            case Timestamp.LOGICAL_NAME:
+                return Timestamp.fromLogical(schema, (java.util.Date) value);
+        }
+        return value;
     }
 
     private static Object castValueToType(Schema schema, Object value, Schema.Type targetType)
{
@@ -236,6 +254,11 @@ public abstract class Cast<R extends ConnectRecord<R>> implements
Transformation
             // Ensure the type we are trying to cast from is supported
             validCastType(inferredType, FieldType.INPUT);
 
+            // Perform logical type encoding to their internal representation.
+            if (schema != null && schema.name() != null && targetType !=
Type.STRING) {
+                value = encodeLogicalType(schema, value);
+            }
+
             switch (targetType) {
                 case INT8:
                     return castToInt8(value);
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index c568afb..a28aa28 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -17,11 +17,16 @@
 
 package org.apache.kafka.connect.transforms;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
 import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
@@ -42,7 +47,8 @@ import static org.junit.Assert.assertTrue;
 public class CastTest {
     private final Cast<SourceRecord> xformKey = new Cast.Key<>();
     private final Cast<SourceRecord> xformValue = new Cast.Value<>();
-    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+    private static final long MILLIS_PER_HOUR = TimeUnit.HOURS.toMillis(1);
+    private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1);
 
     @After
     public void teardown() {
@@ -320,6 +326,97 @@ public class CastTest {
         xformValue.apply(new SourceRecord(null, null, "topic", 0, null, Collections.singletonList("foo")));
     }
 
+    @Test
+    public void castLogicalToPrimitive() {
+        List<String> specParts = Arrays.asList(
+            "date_to_int32:int32",  // Cast to underlying representation
+            "timestamp_to_int64:int64",  // Cast to underlying representation
+            "time_to_int64:int64",  // Cast to wider datatype than underlying representation
+            "decimal_to_int32:int32",  // Cast to narrower datatype with data loss
+            "timestamp_to_float64:float64",  // loss of precision casting to double
+            "null_timestamp_to_int32:int32"
+        );
+
+        Date day = new Date(MILLIS_PER_DAY);
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+            String.join(",", specParts)));
+
+        SchemaBuilder builder = SchemaBuilder.struct();
+        builder.field("date_to_int32", org.apache.kafka.connect.data.Date.SCHEMA);
+        builder.field("timestamp_to_int64", Timestamp.SCHEMA);
+        builder.field("time_to_int64", Time.SCHEMA);
+        builder.field("decimal_to_int32", Decimal.schema(new BigDecimal((long) Integer.MAX_VALUE
+ 1).scale()));
+        builder.field("timestamp_to_float64", Timestamp.SCHEMA);
+        builder.field("null_timestamp_to_int32", Timestamp.builder().optional().build());
+
+        Schema supportedTypesSchema = builder.build();
+
+        Struct recordValue = new Struct(supportedTypesSchema);
+        recordValue.put("date_to_int32", day);
+        recordValue.put("timestamp_to_int64", new Date(0));
+        recordValue.put("time_to_int64", new Date(1));
+        recordValue.put("decimal_to_int32", new BigDecimal((long) Integer.MAX_VALUE + 1));
+        recordValue.put("timestamp_to_float64", new Date(Long.MAX_VALUE));
+        recordValue.put("null_timestamp_to_int32", null);
+
+        SourceRecord transformed = xformValue.apply(
+            new SourceRecord(null, null, "topic", 0,
+                supportedTypesSchema, recordValue));
+
+        assertEquals(1, ((Struct) transformed.value()).get("date_to_int32"));
+        assertEquals(0L, ((Struct) transformed.value()).get("timestamp_to_int64"));
+        assertEquals(1L, ((Struct) transformed.value()).get("time_to_int64"));
+        assertEquals(Integer.MIN_VALUE, ((Struct) transformed.value()).get("decimal_to_int32"));
+        assertEquals(9.223372036854776E18, ((Struct) transformed.value()).get("timestamp_to_float64"));
+        assertNull(((Struct) transformed.value()).get("null_timestamp_to_int32"));
+
+        Schema transformedSchema = ((Struct) transformed.value()).schema();
+        assertEquals(Type.INT32, transformedSchema.field("date_to_int32").schema().type());
+        assertEquals(Type.INT64, transformedSchema.field("timestamp_to_int64").schema().type());
+        assertEquals(Type.INT64, transformedSchema.field("time_to_int64").schema().type());
+        assertEquals(Type.INT32, transformedSchema.field("decimal_to_int32").schema().type());
+        assertEquals(Type.FLOAT64, transformedSchema.field("timestamp_to_float64").schema().type());
+        assertEquals(Type.INT32, transformedSchema.field("null_timestamp_to_int32").schema().type());
+    }
+
+    @Test
+    public void castLogicalToString() {
+        Date date = new Date(MILLIS_PER_DAY);
+        Date time = new Date(MILLIS_PER_HOUR);
+        Date timestamp = new Date();
+
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+            "date:string,decimal:string,time:string,timestamp:string"));
+
+        SchemaBuilder builder = SchemaBuilder.struct();
+        builder.field("date", org.apache.kafka.connect.data.Date.SCHEMA);
+        builder.field("decimal", Decimal.schema(new BigDecimal(1982).scale()));
+        builder.field("time", Time.SCHEMA);
+        builder.field("timestamp", Timestamp.SCHEMA);
+
+        Schema supportedTypesSchema = builder.build();
+
+        Struct recordValue = new Struct(supportedTypesSchema);
+        recordValue.put("date", date);
+        recordValue.put("decimal", new BigDecimal(1982));
+        recordValue.put("time", time);
+        recordValue.put("timestamp", timestamp);
+
+        SourceRecord transformed = xformValue.apply(
+            new SourceRecord(null, null, "topic", 0,
+                supportedTypesSchema, recordValue));
+
+        assertEquals(Values.dateFormatFor(date).format(date), ((Struct) transformed.value()).get("date"));
+        assertEquals("1982", ((Struct) transformed.value()).get("decimal"));
+        assertEquals(Values.dateFormatFor(time).format(time), ((Struct) transformed.value()).get("time"));
+        assertEquals(Values.dateFormatFor(timestamp).format(timestamp), ((Struct) transformed.value()).get("timestamp"));
+
+        Schema transformedSchema = ((Struct) transformed.value()).schema();
+        assertEquals(Type.STRING, transformedSchema.field("date").schema().type());
+        assertEquals(Type.STRING, transformedSchema.field("decimal").schema().type());
+        assertEquals(Type.STRING, transformedSchema.field("time").schema().type());
+        assertEquals(Type.STRING, transformedSchema.field("timestamp").schema().type());
+    }
 
     @Test
     public void castFieldsWithSchema() {
@@ -338,7 +435,7 @@ public class CastTest {
         builder.field("boolean", Schema.BOOLEAN_SCHEMA);
         builder.field("string", Schema.STRING_SCHEMA);
         builder.field("bigdecimal", Decimal.schema(new BigDecimal(42).scale()));
-        builder.field("date", Timestamp.SCHEMA);
+        builder.field("date", org.apache.kafka.connect.data.Date.SCHEMA);
         builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA);
         builder.field("timestamp", Timestamp.SCHEMA);
         Schema supportedTypesSchema = builder.build();


Mime
View raw message