kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-3209: KIP-66: single message transforms
Date Fri, 13 Jan 2017 00:15:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a1e8b507d -> 2f9048832


http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java
new file mode 100644
index 0000000..c2726ca
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Transformation<R>
{
+
+    public static final String FIELD_CONFIG = "field";
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
+                    "Field name for the single field that will be created in the resulting
Struct.");
+
+    private Cache<Schema, Schema> schemaUpdateCache;
+
+    private String fieldName;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        fieldName = config.getString("field");
+        schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
+    }
+
+    @Override
+    public R apply(R record) {
+        final Schema schema = operatingSchema(record);
+        final Object value = operatingValue(record);
+
+        Schema updatedSchema = schemaUpdateCache.get(schema);
+        if (updatedSchema == null) {
+            updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
+            schemaUpdateCache.put(schema, updatedSchema);
+        }
+
+        final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);
+
+        return newRecord(record, updatedSchema, updatedValue);
+    }
+
+    @Override
+    public void close() {
+        schemaUpdateCache = null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+    /**
+     * Wraps the record key in a {@link org.apache.kafka.connect.data.Struct} with specified
field name.
+     */
+    public static class Key<R extends ConnectRecord<R>> extends HoistToStruct<R>
{
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema,
updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+
+    }
+
+    /**
+     * Wraps the record value in a {@link org.apache.kafka.connect.data.Struct} with specified
field name.
+     */
+    public static class Value<R extends ConnectRecord<R>> extends HoistToStruct<R>
{
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(), updatedSchema, updatedValue, record.timestamp());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
new file mode 100644
index 0000000..d67fea0
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class InsertField<R extends ConnectRecord<R>> implements Transformation<R>
{
+
+    public interface Keys {
+        String TOPIC_FIELD = "topic.field";
+        String PARTITION_FIELD = "partition.field";
+        String OFFSET_FIELD = "offset.field";
+        String TIMESTAMP_FIELD = "timestamp.field";
+        String STATIC_FIELD = "static.field";
+        String STATIC_VALUE = "static.value";
+    }
+
+    private static final String OPTIONALITY_DOC = "Suffix with '!' to make this a required
field, or '?' to keep it optional (the default).";
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(Keys.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for Kafka topic.\n" + OPTIONALITY_DOC)
+            .define(Keys.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for Kafka partition.\n" + OPTIONALITY_DOC)
+            .define(Keys.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for Kafka offset - only applicable to sink connectors.\n"
+ OPTIONALITY_DOC)
+            .define(Keys.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for record timestamp.\n" + OPTIONALITY_DOC)
+            .define(Keys.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for static data field.\n" + OPTIONALITY_DOC)
+            .define(Keys.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Static field value, if field name configured.");
+
+    private static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().build();
+
+    private static final class InsertionSpec {
+        final String name;
+        final boolean optional;
+
+        private InsertionSpec(String name, boolean optional) {
+            this.name = name;
+            this.optional = optional;
+        }
+
+        public static InsertionSpec parse(String spec) {
+            if (spec == null) return null;
+            if (spec.endsWith("?")) {
+                return new InsertionSpec(spec.substring(0, spec.length() - 1), true);
+            }
+            if (spec.endsWith("!")) {
+                return new InsertionSpec(spec.substring(0, spec.length() - 1), false);
+            }
+            return new InsertionSpec(spec, true);
+        }
+    }
+
+    private InsertionSpec topicField;
+    private InsertionSpec partitionField;
+    private InsertionSpec offsetField;
+    private InsertionSpec timestampField;
+    private InsertionSpec staticField;
+    private String staticValue;
+    private boolean applicable;
+
+    private Cache<Schema, Schema> schemaUpdateCache;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        topicField = InsertionSpec.parse(config.getString(Keys.TOPIC_FIELD));
+        partitionField = InsertionSpec.parse(config.getString(Keys.PARTITION_FIELD));
+        offsetField = InsertionSpec.parse(config.getString(Keys.OFFSET_FIELD));
+        timestampField = InsertionSpec.parse(config.getString(Keys.TIMESTAMP_FIELD));
+        staticField = InsertionSpec.parse(config.getString(Keys.STATIC_FIELD));
+        staticValue = config.getString(Keys.STATIC_VALUE);
+        applicable = topicField != null || partitionField != null || offsetField != null
|| timestampField != null;
+
+        schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
+    }
+
+    @Override
+    public R apply(R record) {
+        if (!applicable) return record;
+
+        final Schema schema = operatingSchema(record);
+        final Object value = operatingValue(record);
+
+        if (value == null)
+            throw new DataException("null value");
+
+        if (schema == null) {
+            if (!(value instanceof Map))
+                throw new DataException("Can only operate on Map value in schemaless mode:
" + value.getClass().getName());
+            return applySchemaless(record, (Map<String, Object>) value);
+        } else {
+            if (schema.type() != Schema.Type.STRUCT)
+                throw new DataException("Can only operate on Struct types: " + value.getClass().getName());
+            return applyWithSchema(record, schema, (Struct) value);
+        }
+    }
+
+    private R applySchemaless(R record, Map<String, Object> value) {
+        final Map<String, Object> updatedValue = new HashMap<>(value);
+
+        if (topicField != null) {
+            updatedValue.put(topicField.name, record.topic());
+        }
+        if (partitionField != null && record.kafkaPartition() != null) {
+            updatedValue.put(partitionField.name, record.kafkaPartition());
+        }
+        if (offsetField != null) {
+            if (!(record instanceof SinkRecord))
+                throw new DataException("Offset insertion is only supported for sink connectors,
record is of type: " + record.getClass());
+            updatedValue.put(offsetField.name, ((SinkRecord) record).kafkaOffset());
+        }
+        if (timestampField != null && record.timestamp() != null) {
+            updatedValue.put(timestampField.name, record.timestamp());
+        }
+        if (staticField != null && staticValue != null) {
+            updatedValue.put(staticField.name, staticValue);
+        }
+        return newRecord(record, null, updatedValue);
+    }
+
+    private R applyWithSchema(R record, Schema schema, Struct value) {
+        Schema updatedSchema = schemaUpdateCache.get(schema);
+        if (updatedSchema == null) {
+            updatedSchema = makeUpdatedSchema(schema);
+            schemaUpdateCache.put(schema, updatedSchema);
+        }
+
+        final Struct updatedValue = new Struct(updatedSchema);
+
+        copyFields(value, updatedValue);
+
+        insertFields(record, updatedValue);
+
+        return newRecord(record, updatedSchema, updatedValue);
+    }
+
+    private Schema makeUpdatedSchema(Schema schema) {
+        final SchemaBuilder builder = SchemaBuilder.struct();
+
+        builder.name(schema.name());
+        builder.version(schema.version());
+        builder.doc(schema.doc());
+
+        final Map<String, String> params = schema.parameters();
+        if (params != null) {
+            builder.parameters(params);
+        }
+
+        for (Field field : schema.fields()) {
+            builder.field(field.name(), field.schema());
+        }
+
+        if (topicField != null) {
+            builder.field(topicField.name, topicField.optional ? Schema.OPTIONAL_STRING_SCHEMA
: Schema.STRING_SCHEMA);
+        }
+        if (partitionField != null) {
+            builder.field(partitionField.name, partitionField.optional ? Schema.OPTIONAL_INT32_SCHEMA
: Schema.INT32_SCHEMA);
+        }
+        if (offsetField != null) {
+            builder.field(offsetField.name, offsetField.optional ? Schema.OPTIONAL_INT64_SCHEMA
: Schema.INT64_SCHEMA);
+        }
+        if (timestampField != null) {
+            builder.field(timestampField.name, timestampField.optional ? OPTIONAL_TIMESTAMP_SCHEMA
: Timestamp.SCHEMA);
+        }
+        if (staticField != null) {
+            builder.field(staticField.name, staticField.optional ? Schema.OPTIONAL_STRING_SCHEMA
: Schema.STRING_SCHEMA);
+        }
+
+        return builder.build();
+    }
+
+    private void copyFields(Struct value, Struct updatedValue) {
+        for (Field field : value.schema().fields()) {
+            updatedValue.put(field.name(), value.get(field));
+        }
+    }
+
+    private void insertFields(R record, Struct value) {
+        if (topicField != null) {
+            value.put(topicField.name, record.topic());
+        }
+        if (partitionField != null && record.kafkaPartition() != null) {
+            value.put(partitionField.name, record.kafkaPartition());
+        }
+        if (offsetField != null) {
+            if (!(record instanceof SinkRecord)) {
+                throw new DataException("Offset insertion is only supported for sink connectors,
record is of type: " + record.getClass());
+            }
+            value.put(offsetField.name, ((SinkRecord) record).kafkaOffset());
+        }
+        if (timestampField != null && record.timestamp() != null) {
+            value.put(timestampField.name, new Date(record.timestamp()));
+        }
+        if (staticField != null && staticValue != null) {
+            value.put(staticField.name, staticValue);
+        }
+    }
+
+    @Override
+    public void close() {
+        schemaUpdateCache = null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+    /**
+     * This transformation allows inserting configured attributes of the record metadata
as fields in the record key.
+     * It also allows adding a static data field.
+     */
+    public static class Key<R extends ConnectRecord<R>> extends InsertField<R>
{
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema,
updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+
+    }
+
+    /**
+     * This transformation allows inserting configured attributes of the record metadata
as fields in the record value.
+     * It also allows adding a static data field.
+     */
+    public static class Value<R extends ConnectRecord<R>> extends InsertField<R>
{
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(), updatedSchema, updatedValue, record.timestamp());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
new file mode 100644
index 0000000..1dd5345
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This transformation facilitates updating the record's topic field as a function of the
original topic value and the record timestamp.
+ * <p/>
+ * It is mainly useful for sink connectors, since the topic field is often used to determine
the equivalent entity name in the destination system
+ * (e.g. database table or search index name).
+ */
+public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R>
{
+
+    public interface Keys {
+        String TOPIC_FORMAT = "topic.format";
+        String TIMESTAMP_FORMAT = "timestamp.format";
+    }
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(Keys.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH,
+                    "Format string which can contain ``${topic}`` and ``${timestamp}`` as
placeholders for the topic and timestamp, respectively.")
+            .define(Keys.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH,
+                    "Format string for the timestamp that is compatible with java.text.SimpleDateFormat.");
+
+    private String topicFormat;
+    private ThreadLocal<SimpleDateFormat> timestampFormat;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+
+        topicFormat = config.getString(Keys.TOPIC_FORMAT);
+
+        final String timestampFormatStr = config.getString(Keys.TIMESTAMP_FORMAT);
+        timestampFormat = new ThreadLocal<SimpleDateFormat>() {
+            @Override
+            protected SimpleDateFormat initialValue() {
+                final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormatStr);
+                fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
+                return fmt;
+            }
+        };
+    }
+
+    @Override
+    public R apply(R record) {
+        final Long timestamp = record.timestamp();
+        if (timestamp == null) {
+            throw new DataException("Timestamp missing on record: " + record);
+        }
+        final String formattedTimestamp = timestampFormat.get().format(new Date(timestamp));
+        final String updatedTopic = topicFormat.replace("${topic}", record.topic()).replace("${timestamp}",
formattedTimestamp);
+        return record.newRecord(
+                updatedTopic, record.kafkaPartition(),
+                record.keySchema(), record.key(),
+                record.valueSchema(), record.value(),
+                record.timestamp()
+        );
+    }
+
+    @Override
+    public void close() {
+        timestampFormat = null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SimpleConfig.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SimpleConfig.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SimpleConfig.java
new file mode 100644
index 0000000..6ed1e14
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SimpleConfig.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms.util;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Map;
+
+/**
+ * A barebones concrete implementation of {@link AbstractConfig}.
+ */
+public class SimpleConfig extends AbstractConfig {
+
+    public SimpleConfig(ConfigDef configDef, Map<?, ?> originals) {
+        super(configDef, originals, false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java
new file mode 100644
index 0000000..99a6e99
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class HoistToStructTest {
+
+    @Test
+    public void sanityCheck() {
+        final HoistToStruct<SinkRecord> xform = new HoistToStruct.Key<>();
+        xform.configure(Collections.singletonMap("field", "magic"));
+
+        final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null,
null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertEquals(Schema.Type.STRUCT, transformedRecord.keySchema().type());
+        assertEquals(record.keySchema(),  transformedRecord.keySchema().field("magic").schema());
+        assertEquals(42, ((Struct) transformedRecord.key()).get("magic"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
new file mode 100644
index 0000000..07eb003
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+public class InsertFieldTest {
+
+    @Test(expected = DataException.class)
+    public void topLevelStructRequired() {
+        final InsertField<SourceRecord> xform = new InsertField.Value<>();
+        xform.configure(Collections.singletonMap("topic.field", "topic_field"));
+        xform.apply(new SourceRecord(null, null,
+                "", 0,
+                Schema.INT32_SCHEMA, 42));
+    }
+
+    @Test
+    public void copySchemaAndInsertConfiguredFields() {
+        final Map<String, Object> props = new HashMap<>();
+        props.put("topic.field", "topic_field!");
+        props.put("partition.field", "partition_field");
+        props.put("timestamp.field", "timestamp_field?");
+        props.put("static.field", "instance_id");
+        props.put("static.value", "my-instance-id");
+
+        final InsertField<SourceRecord> xform = new InsertField.Value<>();
+        xform.configure(props);
+
+        final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic",
Schema.OPTIONAL_INT64_SCHEMA).build();
+        final Struct simpleStruct = new Struct(simpleStructSchema).put("magic", 42L);
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0, simpleStructSchema,
simpleStruct);
+        final SourceRecord transformedRecord = xform.apply(record);
+
+        assertEquals(simpleStructSchema.name(), transformedRecord.valueSchema().name());
+        assertEquals(simpleStructSchema.version(), transformedRecord.valueSchema().version());
+        assertEquals(simpleStructSchema.doc(), transformedRecord.valueSchema().doc());
+
+        assertEquals(Schema.OPTIONAL_INT64_SCHEMA, transformedRecord.valueSchema().field("magic").schema());
+        assertEquals(42L, ((Struct) transformedRecord.value()).getInt64("magic").longValue());
+
+        assertEquals(Schema.STRING_SCHEMA, transformedRecord.valueSchema().field("topic_field").schema());
+        assertEquals("test", ((Struct) transformedRecord.value()).getString("topic_field"));
+
+        assertEquals(Schema.OPTIONAL_INT32_SCHEMA, transformedRecord.valueSchema().field("partition_field").schema());
+        assertEquals(0, ((Struct) transformedRecord.value()).getInt32("partition_field").intValue());
+
+        assertEquals(Timestamp.builder().optional().build(), transformedRecord.valueSchema().field("timestamp_field").schema());
+        assertEquals(null, ((Struct) transformedRecord.value()).getInt64("timestamp_field"));
+
+        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, transformedRecord.valueSchema().field("instance_id").schema());
+        assertEquals("my-instance-id", ((Struct) transformedRecord.value()).getString("instance_id"));
+
+        // Exercise caching
+        final SourceRecord transformedRecord2 = xform.apply(
+                new SourceRecord(null, null, "test", 1, simpleStructSchema, new Struct(simpleStructSchema)));
+        assertSame(transformedRecord.valueSchema(), transformedRecord2.valueSchema());
+    }
+
+    @Test
+    public void schemalessInsertConfiguredFields() {
+        final Map<String, Object> props = new HashMap<>();
+        props.put("topic.field", "topic_field!");
+        props.put("partition.field", "partition_field");
+        props.put("timestamp.field", "timestamp_field?");
+        props.put("static.field", "instance_id");
+        props.put("static.value", "my-instance-id");
+
+        final InsertField<SourceRecord> xform = new InsertField.Value<>();
+        xform.configure(props);
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0,
+                null, Collections.singletonMap("magic", 42L));
+
+        final SourceRecord transformedRecord = xform.apply(record);
+
+        assertEquals(42L, ((Map) transformedRecord.value()).get("magic"));
+        assertEquals("test", ((Map) transformedRecord.value()).get("topic_field"));
+        assertEquals(0, ((Map) transformedRecord.value()).get("partition_field"));
+        assertEquals(null, ((Map) transformedRecord.value()).get("timestamp_field"));
+        assertEquals("my-instance-id", ((Map) transformedRecord.value()).get("instance_id"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
new file mode 100644
index 0000000..cbd95a6
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimestampRouterTest {
+
+    @Test
+    public void defaultConfiguration() {
+        final TimestampRouter<SourceRecord> xform = new TimestampRouter<>();
+        xform.configure(Collections.<String, Object>emptyMap()); // defaults
+        final SourceRecord record = new SourceRecord(
+                null, null,
+                "test", 0,
+                null, null,
+                null, null,
+                1483425001864L
+        );
+        assertEquals("test-20170103", xform.apply(record).topic());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 4c133c3..71a83b3 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -464,6 +464,8 @@ class LoggingMessageFormatter extends MessageFormatter   {
   private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
   val logger = Logger.getLogger(getClass().getName)
 
+  override def init(props: Properties): Unit = defaultWriter.init(props)
+
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream):
Unit = {
     import consumerRecord._
     defaultWriter.writeTo(consumerRecord, output)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index d430c2f..29d3895 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,4 +14,4 @@
 // limitations under the License.
 
 include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
-        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
+        'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file'

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 6984fc9..17ddb6b 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -89,7 +89,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
     def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group",
new_consumer=True,
                  message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
                  client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
-                 enable_systest_events=False, stop_timeout_sec=15):
+                 enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False):
         """
         Args:
             context:                    standard context
@@ -103,11 +103,12 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
                                         successively consumed messages exceeds this timeout.
Setting this and
                                         waiting for the consumer to stop is a pretty good
way to consume all messages
                                         in a topic.
-            print_key                   if True, print each message's key in addition to
its value
+            print_key                   if True, print each message's key as well
             enable_systest_events       if True, console consumer will print additional lifecycle-related
information
                                         only available in 0.10.0 and later.
             stop_timeout_sec            After stopping a node, wait up to stop_timeout_sec
for the node to stop,
                                         and the corresponding background thread to finish
successfully.
+            print_timestamp             if True, print each message's timestamp as well
         """
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [])
         BackgroundThreadService.__init__(self, context, num_nodes)
@@ -136,6 +137,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             # Only available in 0.10.0 and up
             assert version >= V_0_10_0_0
 
+        self.print_timestamp = print_timestamp
+
     def prop_file(self, node):
         """Return a string which can be used to create a configuration file appropriate for
the given node."""
         # Process client configuration
@@ -192,6 +195,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         if self.print_key:
             cmd += " --property print.key=true"
 
+        if self.print_timestamp:
+            cmd += " --property print.timestamp=true"
+
         # LoggingMessageFormatter was introduced after 0.9
         if node.version > LATEST_0_9:
             cmd += " --formatter kafka.tools.LoggingMessageFormatter"

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 3c12cd9..198e945 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -20,15 +20,16 @@ from ducktape.mark import matrix, parametrize
 from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, config_property
 from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink,
ConnectRestError, MockSink, MockSource
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 
-import itertools, time
 from collections import Counter, namedtuple
+import itertools
+import json
 import operator
-
+import time
 
 class ConnectDistributedTest(Test):
     """
@@ -70,10 +71,13 @@ class ConnectDistributedTest(Test):
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
 
-    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT):
+    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None):
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
                                   security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
                                   topics=self.topics)
+        if timestamp_type is not None:
+            for node in self.kafka.nodes:
+                node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
 
         self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE,
self.OUTPUT_FILE])
         self.cc.log_level = "DEBUG"
@@ -437,6 +441,61 @@ class ConnectDistributedTest(Test):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
+    @cluster(num_nodes=6)
+    def test_transformations(self):
+        self.setup_services(timestamp_type='CreateTime')
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        ts_fieldname = 'the_timestamp'
+
+        NamedConnector = namedtuple('Connector', ['name'])
+
+        source_connector = NamedConnector(name='file-src')
+
+        self.cc.create_connector({
+            'name': source_connector.name,
+            'connector.class': 'org.apache.kafka.connect.file.FileStreamSourceConnector',
+            'tasks.max': 1,
+            'file': self.INPUT_FILE,
+            'topic': self.TOPIC,
+            'transforms': 'hoistToStruct,insertTimestampField',
+            'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistToStruct$Value',
+            'transforms.hoistToStruct.field': 'content',
+            'transforms.insertTimestampField.type': 'org.apache.kafka.connect.transforms.InsertField$Value',
+            'transforms.insertTimestampField.timestamp.field': ts_fieldname,
+        })
+
+        wait_until(lambda: self.connector_is_running(source_connector), timeout_sec=30, err_msg='Failed
to see connector transition to the RUNNING state')
+
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
+
+        consumer = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=15000,
print_timestamp=True)
+        consumer.run()
+
+        assert len(consumer.messages_consumed[1]) == len(self.FIRST_INPUT_LIST)
+
+        expected_schema = {
+            'type': 'struct',
+            'fields': [
+                {'field': 'content', 'type': 'string', 'optional': False},
+                {'field': ts_fieldname, 'name': 'org.apache.kafka.connect.data.Timestamp',
'type': 'int64', 'version': 1, 'optional': True},
+            ],
+            'optional': False
+        }
+
+        for msg in consumer.messages_consumed[1]:
+            (ts_info, value) = msg.split('\t')
+
+            assert ts_info.startswith('CreateTime:')
+            ts = int(ts_info[len('CreateTime:'):])
+
+            obj = json.loads(value)
+            assert obj['schema'] == expected_schema
+            assert obj['payload']['content'] in self.FIRST_INPUT_LIST
+            assert obj['payload'][ts_fieldname] == ts
+
     def _validate_file_output(self, input):
         input_set = set(input)
         # Output needs to be collected from all nodes because we can't be sure where the
tasks will be scheduled.

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 098790b..999d773 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -31,8 +31,8 @@ class ConnectRestApiTest(KafkaTest):
     FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
     FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
 
-    FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter',
'topic', 'file'}
-    FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter',
'topics', 'file'}
+    FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter',
'topic', 'file', 'transforms'}
+    FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter',
'topics', 'file', 'transforms'}
 
     INPUT_FILE = "/mnt/connect.input"
     INPUT_FILE2 = "/mnt/connect.input2"


Mime
View raw message