kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4714; TimestampConverter transformation (KIP-66)
Date Fri, 19 May 2017 18:28:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 b31df613f -> 64e12d3c6


KAFKA-4714; TimestampConverter transformation (KIP-66)

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes #3065 from ewencp/kafka-3209-timestamp-converter

(cherry picked from commit 61bab2d875ab5e03d0df4f62217346549a4c64c3)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64e12d3c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64e12d3c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64e12d3c

Branch: refs/heads/0.11.0
Commit: 64e12d3c641baeb6a4a624b971ba33721d656054
Parents: b31df61
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Fri May 19 11:26:59 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri May 19 11:28:20 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/tools/TransformationDoc.java  |   4 +-
 .../connect/transforms/TimestampConverter.java  | 452 +++++++++++++++++++
 .../connect/transforms/util/Requirements.java   |   2 +-
 .../transforms/TimestampConverterTest.java      | 370 +++++++++++++++
 4 files changed, 826 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64e12d3c/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
index 1a8f0a8..b76e7d4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -26,6 +26,7 @@ import org.apache.kafka.connect.transforms.MaskField;
 import org.apache.kafka.connect.transforms.RegexRouter;
 import org.apache.kafka.connect.transforms.ReplaceField;
 import org.apache.kafka.connect.transforms.SetSchemaMetadata;
+import org.apache.kafka.connect.transforms.TimestampConverter;
 import org.apache.kafka.connect.transforms.TimestampRouter;
 import org.apache.kafka.connect.transforms.ValueToKey;
 
@@ -58,7 +59,8 @@ public class TransformationDoc {
             new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF),
             new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF),
             new DocInfo(Flatten.class.getName(), Flatten.OVERVIEW_DOC, Flatten.CONFIG_DEF),
-            new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, Cast.CONFIG_DEF)
+            new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, Cast.CONFIG_DEF),
+            new DocInfo(TimestampConverter.class.getName(), TimestampConverter.OVERVIEW_DOC,
TimestampConverter.CONFIG_DEF)
     );
 
     private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/64e12d3c/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
new file mode 100644
index 0000000..ce7d002
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+
+public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Convert timestamps between different formats such as Unix epoch, strings, and
Connect Date/Timestamp types."
+                    + "Applies to individual fields or to the entire value."
+                    + "<p/>Use the concrete transformation type designed for the record
key (<code>" + TimestampConverter.Key.class.getName() + "</code>) "
+                    + "or value (<code>" + TimestampConverter.Value.class.getName()
+ "</code>).";
+
+    public static final String FIELD_CONFIG = "field";
+    private static final String FIELD_DEFAULT = "";
+
+    public static final String TYPE_CONFIG = "type";
+
+    public static final String FORMAT_CONFIG = "format";
+    private static final String FORMAT_DEFAULT = "";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH,
+                    "The field containing the timestamp, or empty if the entire value is
a timestamp")
+            .define(TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The desired timestamp representation: string, unix, Date, Time, or Timestamp")
+            .define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM,
+                    "A SimpleDateFormat-compatible format for the timestamp. Used to generate
the output when type=string "
+                            + "or used to parse the input if the input is a string.");
+
+    private static final String PURPOSE = "converting timestamp formats";
+
+    private static final String TYPE_STRING = "string";
+    private static final String TYPE_UNIX = "unix";
+    private static final String TYPE_DATE = "Date";
+    private static final String TYPE_TIME = "Time";
+    private static final String TYPE_TIMESTAMP = "Timestamp";
+    private static final Set<String> VALID_TYPES = new HashSet<>(Arrays.asList(TYPE_STRING,
TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP));
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    private interface TimestampTranslator {
+        /**
+         * Convert from the type-specific format to the universal java.util.Date format
+         */
+        Date toRaw(Config config, Object orig);
+
+        /**
+         * Get the schema for this format.
+         */
+        Schema typeSchema();
+
+        /**
+         * Convert from the universal java.util.Date format to the type-specific format
+         */
+        Object toType(Config config, Date orig);
+    }
+
+    private static final Map<String, TimestampTranslator> TRANSLATORS = new HashMap<>();
+    static {
+        TRANSLATORS.put(TYPE_STRING, new TimestampTranslator() {
+            @Override
+            public Date toRaw(Config config, Object orig) {
+                if (!(orig instanceof String))
+                    throw new DataException("Expected string timestamp to be a String, but
found " + orig.getClass());
+                try {
+                    return config.format.parse((String) orig);
+                } catch (ParseException e) {
+                    throw new DataException("Could not parse timestamp: value (" + orig +
") does not match pattern ("
+                            + config.format.toPattern() + ")", e);
+                }
+            }
+
+            @Override
+            public Schema typeSchema() {
+                return Schema.STRING_SCHEMA;
+            }
+
+            @Override
+            public String toType(Config config, Date orig) {
+                synchronized (config.format) {
+                    return config.format.format(orig);
+                }
+            }
+        });
+
+        TRANSLATORS.put(TYPE_UNIX, new TimestampTranslator() {
+            @Override
+            public Date toRaw(Config config, Object orig) {
+                if (!(orig instanceof Long))
+                    throw new DataException("Expected Unix timestamp to be a Long, but found
" + orig.getClass());
+                return Timestamp.toLogical(Timestamp.SCHEMA, (Long) orig);
+            }
+
+            @Override
+            public Schema typeSchema() {
+                return Schema.INT64_SCHEMA;
+            }
+
+            @Override
+            public Long toType(Config config, Date orig) {
+                return Timestamp.fromLogical(Timestamp.SCHEMA, orig);
+            }
+        });
+
+        TRANSLATORS.put(TYPE_DATE, new TimestampTranslator() {
+            @Override
+            public Date toRaw(Config config, Object orig) {
+                if (!(orig instanceof Date))
+                    throw new DataException("Expected Date to be a java.util.Date, but found
" + orig.getClass());
+                // Already represented as a java.util.Date and Connect Dates are a subset
of valid java.util.Date values
+                return (Date) orig;
+            }
+
+            @Override
+            public Schema typeSchema() {
+                return org.apache.kafka.connect.data.Date.SCHEMA;
+            }
+
+            @Override
+            public Date toType(Config config, Date orig) {
+                Calendar result = Calendar.getInstance(UTC);
+                result.setTime(orig);
+                result.set(Calendar.HOUR_OF_DAY, 0);
+                result.set(Calendar.MINUTE, 0);
+                result.set(Calendar.SECOND, 0);
+                result.set(Calendar.MILLISECOND, 0);
+                return result.getTime();
+            }
+        });
+
+        TRANSLATORS.put(TYPE_TIME, new TimestampTranslator() {
+            @Override
+            public Date toRaw(Config config, Object orig) {
+                if (!(orig instanceof Date))
+                    throw new DataException("Expected Time to be a java.util.Date, but found
" + orig.getClass());
+                // Already represented as a java.util.Date and Connect Times are a subset
of valid java.util.Date values
+                return (Date) orig;
+            }
+
+            @Override
+            public Schema typeSchema() {
+                return Time.SCHEMA;
+            }
+
+            @Override
+            public Date toType(Config config, Date orig) {
+                Calendar origCalendar = Calendar.getInstance(UTC);
+                origCalendar.setTime(orig);
+                Calendar result = Calendar.getInstance(UTC);
+                result.setTimeInMillis(0L);
+                result.set(Calendar.HOUR_OF_DAY, origCalendar.get(Calendar.HOUR_OF_DAY));
+                result.set(Calendar.MINUTE, origCalendar.get(Calendar.MINUTE));
+                result.set(Calendar.SECOND, origCalendar.get(Calendar.SECOND));
+                result.set(Calendar.MILLISECOND, origCalendar.get(Calendar.MILLISECOND));
+                return result.getTime();
+            }
+        });
+
+        TRANSLATORS.put(TYPE_TIMESTAMP, new TimestampTranslator() {
+            @Override
+            public Date toRaw(Config config, Object orig) {
+                if (!(orig instanceof Date))
+                    throw new DataException("Expected Timestamp to be a java.util.Date, but
found " + orig.getClass());
+                return (Date) orig;
+            }
+
+            @Override
+            public Schema typeSchema() {
+                return Timestamp.SCHEMA;
+            }
+
+            @Override
+            public Date toType(Config config, Date orig) {
+                return orig;
+            }
+        });
+    }
+
+    // This is a bit unusual, but allows the transformation config to be passed to static
anonymous classes to customize
+    // their behavior
+    private static class Config {
+        Config(String field, String type, SimpleDateFormat format) {
+            this.field = field;
+            this.type = type;
+            this.format = format;
+        }
+        String field;
+        String type;
+        SimpleDateFormat format;
+    }
+    private Config config;
+    private Cache<Schema, Schema> schemaUpdateCache;
+
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        final SimpleConfig simpleConfig = new SimpleConfig(CONFIG_DEF, configs);
+        final String field = simpleConfig.getString(FIELD_CONFIG);
+        final String type = simpleConfig.getString(TYPE_CONFIG);
+        String formatPattern = simpleConfig.getString(FORMAT_CONFIG);
+        schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
+
+        if (!VALID_TYPES.contains(type)) {
+            throw new ConfigException("Unknown timestamp type in TimestampConverter: " +
type + ". Valid values are "
+                    + Utils.join(VALID_TYPES, ", ") + ".");
+        }
+        if (type.equals(TYPE_STRING) && formatPattern.trim().isEmpty()) {
+            throw new ConfigException("TimestampConverter requires format option to be specified
when using string timestamps");
+        }
+        SimpleDateFormat format = null;
+        if (formatPattern != null && !formatPattern.trim().isEmpty()) {
+            try {
+                format = new SimpleDateFormat(formatPattern);
+                format.setTimeZone(UTC);
+            } catch (IllegalArgumentException e) {
+                throw new ConfigException("TimestampConverter requires a SimpleDateFormat-compatible
pattern for string timestamps: "
+                        + formatPattern, e);
+            }
+        }
+        config = new Config(field, type, format);
+    }
+
+    @Override
+    public R apply(R record) {
+        if (operatingSchema(record) == null) {
+            return applySchemaless(record);
+        } else {
+            return applyWithSchema(record);
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public static class Key<R extends ConnectRecord<R>> extends TimestampConverter<R>
{
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema,
updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+    }
+
+    public static class Value<R extends ConnectRecord<R>> extends TimestampConverter<R>
{
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(), updatedSchema, updatedValue, record.timestamp());
+        }
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+    private R applyWithSchema(R record) {
+        final Schema schema = operatingSchema(record);
+        if (config.field.isEmpty()) {
+            Object value = operatingValue(record);
+            // New schema is determined by the requested target timestamp type
+            Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+            return newRecord(record, updatedSchema, convertTimestamp(value, timestampTypeFromSchema(schema)));
+        } else {
+            final Struct value = requireStruct(operatingValue(record), PURPOSE);
+            Schema updatedSchema = schemaUpdateCache.get(value.schema());
+            if (updatedSchema == null) {
+                SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
+                for (Field field : schema.fields()) {
+                    if (field.name().equals(config.field)) {
+                        builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema());
+                    } else {
+                        builder.field(field.name(), field.schema());
+                    }
+                }
+                if (schema.isOptional())
+                    builder.optional();
+                if (schema.defaultValue() != null) {
+                    Struct updatedDefaultValue = applyValueWithSchema((Struct) schema.defaultValue(),
builder);
+                    builder.defaultValue(updatedDefaultValue);
+                }
+
+                updatedSchema = builder.build();
+                schemaUpdateCache.put(schema, updatedSchema);
+            }
+
+            Struct updatedValue = applyValueWithSchema(value, updatedSchema);
+            return newRecord(record, updatedSchema, updatedValue);
+        }
+    }
+
+    private Struct applyValueWithSchema(Struct value, Schema updatedSchema) {
+        Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : value.schema().fields()) {
+            final Object updatedFieldValue;
+            if (field.name().equals(config.field)) {
+                updatedFieldValue = convertTimestamp(value.get(field), timestampTypeFromSchema(field.schema()));
+            } else {
+                updatedFieldValue = value.get(field);
+            }
+            updatedValue.put(field.name(), updatedFieldValue);
+        }
+        return updatedValue;
+    }
+
+    private R applySchemaless(R record) {
+        if (config.field.isEmpty()) {
+            Object value = operatingValue(record);
+            return newRecord(record, null, convertTimestamp(value));
+        } else {
+            final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
+            final HashMap<String, Object> updatedValue = new HashMap<>(value);
+            updatedValue.put(config.field, convertTimestamp(value.get(config.field)));
+            return newRecord(record, null, updatedValue);
+        }
+    }
+
+    /**
+     * Determine the type/format of the timestamp based on the schema
+     */
+    private String timestampTypeFromSchema(Schema schema) {
+        if (Timestamp.LOGICAL_NAME.equals(schema.name())) {
+            return TYPE_TIMESTAMP;
+        } else if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(schema.name()))
{
+            return TYPE_DATE;
+        } else if (Time.LOGICAL_NAME.equals(schema.name())) {
+            return TYPE_TIME;
+        } else if (schema.type().equals(Schema.Type.STRING)) {
+            // If not otherwise specified, string == user-specified string format for timestamps
+            return TYPE_STRING;
+        } else if (schema.type().equals(Schema.Type.INT64)) {
+            // If not otherwise specified, long == unix time
+            return TYPE_UNIX;
+        }
+        throw new ConnectException("Schema " + schema + " does not correspond to a known
timestamp type format");
+    }
+
+    /**
+     * Infer the type/format of the timestamp based on the raw Java type
+     */
+    private String inferTimestampType(Object timestamp) {
+        // Note that we can't infer all types, e.g. Date/Time/Timestamp all have the same
runtime representation as a
+        // java.util.Date
+        if (timestamp instanceof Date) {
+            return TYPE_TIMESTAMP;
+        } else if (timestamp instanceof Long) {
+            return TYPE_UNIX;
+        } else if (timestamp instanceof String) {
+            return TYPE_STRING;
+        }
+        throw new DataException("TimestampConverter does not support " + timestamp.getClass()
+ " objects as timestamps");
+    }
+
+    /**
+     * Convert the given timestamp to the target timestamp format.
+     * @param timestamp the input timestamp
+     * @param timestampFormat the format of the timestamp, or null if the format should be
inferred
+     * @return the converted timestamp
+     */
+    private Object convertTimestamp(Object timestamp, String timestampFormat) {
+        if (timestampFormat == null) {
+            timestampFormat = inferTimestampType(timestamp);
+        }
+
+        TimestampTranslator sourceTranslator = TRANSLATORS.get(timestampFormat);
+        if (sourceTranslator == null) {
+            throw new ConnectException("Unsupported timestamp type: " + timestampFormat);
+        }
+        Date rawTimestamp = sourceTranslator.toRaw(config, timestamp);
+
+        TimestampTranslator targetTranslator = TRANSLATORS.get(config.type);
+        if (targetTranslator == null) {
+            throw new ConnectException("Unsupported timestamp type: " + config.type);
+        }
+        return targetTranslator.toType(config, rawTimestamp);
+    }
+
+    private Object convertTimestamp(Object timestamp) {
+        return convertTimestamp(timestamp, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/64e12d3c/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
index 0220907..6f1be19 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
@@ -55,7 +55,7 @@ public class Requirements {
     }
 
     private static String nullSafeClassName(Object x) {
-        return x == null ? "null" : x.getClass().getCanonicalName();
+        return x == null ? "null" : x.getClass().getName();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64e12d3c/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
new file mode 100644
index 0000000..1b93874
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TimestampConverterTest {
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+    private static final Calendar EPOCH;
+    private static final Calendar TIME;
+    private static final Calendar DATE;
+    private static final Calendar DATE_PLUS_TIME;
+    private static final long DATE_PLUS_TIME_UNIX;
+    private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z";
+    private static final String DATE_PLUS_TIME_STRING;
+
+    static {
+        EPOCH = GregorianCalendar.getInstance(UTC);
+        EPOCH.setTimeInMillis(0L);
+
+        TIME = GregorianCalendar.getInstance(UTC);
+        TIME.setTimeInMillis(0L);
+        TIME.add(Calendar.MILLISECOND, 1234);
+
+        DATE = GregorianCalendar.getInstance(UTC);
+        DATE.setTimeInMillis(0L);
+        DATE.set(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        DATE.add(Calendar.DATE, 1);
+
+        DATE_PLUS_TIME = GregorianCalendar.getInstance(UTC);
+        DATE_PLUS_TIME.setTimeInMillis(0L);
+        DATE_PLUS_TIME.add(Calendar.DATE, 1);
+        DATE_PLUS_TIME.add(Calendar.MILLISECOND, 1234);
+
+        DATE_PLUS_TIME_UNIX = DATE_PLUS_TIME.getTime().getTime();
+        DATE_PLUS_TIME_STRING = "1970 01 02 00 00 01 234 UTC";
+    }
+
+
+    // Configuration
+
+    @Test(expected = ConfigException.class)
+    public void testConfigNoTargetType() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.<String, String>emptyMap());
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testConfigInvalidTargetType() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "invalid"));
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testConfigMissingFormat() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "string"));
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testConfigInvalidFormat() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TYPE_CONFIG, "string");
+        config.put(TimestampConverter.FORMAT_CONFIG, "bad-format");
+        xform.configure(config);
+    }
+
+
+    // Conversions without schemas (most flexible Timestamp -> other types)
+
+    @Test
+    public void testSchemalessIdentity() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE_PLUS_TIME.getTime()));
+
+        assertNull(transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testSchemalessTimestampToDate() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Date"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE_PLUS_TIME.getTime()));
+
+        assertNull(transformed.valueSchema());
+        assertEquals(DATE.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testSchemalessTimestampToTime() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Time"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE_PLUS_TIME.getTime()));
+
+        assertNull(transformed.valueSchema());
+        assertEquals(TIME.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testSchemalessTimestampToUnix() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "unix"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE_PLUS_TIME.getTime()));
+
+        assertNull(transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
+    }
+
+    @Test
+    public void testSchemalessTimestampToString() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TYPE_CONFIG, "string");
+        config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
+        xform.configure(config);
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE_PLUS_TIME.getTime()));
+
+        assertNull(transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
+    }
+
+
+    // Conversions without schemas (core types -> most flexible Timestamp format)
+
+    @Test
+    public void testSchemalessDateToTimestamp() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE.getTime()));
+
+        assertNull(transformed.valueSchema());
+        // No change expected since the source type is coarser-grained
+        assertEquals(DATE.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testSchemalessTimeToTimestamp() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
TIME.getTime()));
+
+        assertNull(transformed.valueSchema());
+        // No change expected since the source type is coarser-grained
+        assertEquals(TIME.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testSchemalessUnixToTimestamp() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE_PLUS_TIME_UNIX));
+
+        assertNull(transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testSchemalessStringToTimestamp() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+        config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
+        xform.configure(config);
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE_PLUS_TIME_STRING));
+
+        assertNull(transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
+    }
+
+
+    // Conversions with schemas (most flexible Timestamp -> other types)
+
+    @Test
+    public void testWithSchemaIdentity() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA,
DATE_PLUS_TIME.getTime()));
+
+        assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testWithSchemaTimestampToDate() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Date"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA,
DATE_PLUS_TIME.getTime()));
+
+        assertEquals(Date.SCHEMA, transformed.valueSchema());
+        assertEquals(DATE.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testWithSchemaTimestampToTime() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Time"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA,
DATE_PLUS_TIME.getTime()));
+
+        assertEquals(Time.SCHEMA, transformed.valueSchema());
+        assertEquals(TIME.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testWithSchemaTimestampToUnix() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "unix"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA,
DATE_PLUS_TIME.getTime()));
+
+        assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
+    }
+
+    @Test
+    public void testWithSchemaTimestampToString() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TYPE_CONFIG, "string");
+        config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
+        xform.configure(config);
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA,
DATE_PLUS_TIME.getTime()));
+
+        assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
+    }
+
+
+    // Conversions with schemas (core types -> most flexible Timestamp format)
+
+    @Test
+    public void testWithSchemaDateToTimestamp() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA,
DATE.getTime()));
+
+        assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
+        // No change expected since the source type is coarser-grained
+        assertEquals(DATE.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testWithSchemaTimeToTimestamp() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA,
TIME.getTime()));
+
+        assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
+        // No change expected since the source type is coarser-grained
+        assertEquals(TIME.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testWithSchemaUnixToTimestamp() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA,
DATE_PLUS_TIME_UNIX));
+
+        assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
+    }
+
+    @Test
+    public void testWithSchemaStringToTimestamp() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+        config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
+        xform.configure(config);
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA,
DATE_PLUS_TIME_STRING));
+
+        assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
+    }
+
+
+    // Convert field instead of entire key/value
+
+    @Test
+    public void testSchemalessFieldConversion() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TYPE_CONFIG, "Date");
+        config.put(TimestampConverter.FIELD_CONFIG, "ts");
+        xform.configure(config);
+
+        Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime());
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
value));
+
+        assertNull(transformed.valueSchema());
+        assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value());
+    }
+
+    @Test
+    public void testWithSchemaFieldConversion() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+        config.put(TimestampConverter.FIELD_CONFIG, "ts");
+        xform.configure(config);
+
+        // ts field is a unix timestamp
+        Schema structWithTimestampFieldSchema = SchemaBuilder.struct()
+                .field("ts", Schema.INT64_SCHEMA)
+                .field("other", Schema.STRING_SCHEMA)
+                .build();
+        Struct original = new Struct(structWithTimestampFieldSchema);
+        original.put("ts", DATE_PLUS_TIME_UNIX);
+        original.put("other", "test");
+
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema,
original));
+
+        Schema expectedSchema = SchemaBuilder.struct()
+                .field("ts", Timestamp.SCHEMA)
+                .field("other", Schema.STRING_SCHEMA)
+                .build();
+        assertEquals(expectedSchema, transformed.valueSchema());
+        assertEquals(DATE_PLUS_TIME.getTime(), ((Struct) transformed.value()).get("ts"));
+        assertEquals("test", ((Struct) transformed.value()).get("other"));
+    }
+
+
+    // Validate Key implementation in addition to Value
+
+    @Test
+    public void testKey() {
+        TimestampConverter<SourceRecord> xform = new TimestampConverter.Key<>();
+        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null,
DATE_PLUS_TIME.getTime(), null, null));
+
+        assertNull(transformed.keySchema());
+        assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());
+    }
+
+}


Mime
View raw message