kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [2/2] kafka git commit: KAFKA-3209: KIP-66: more single message transforms
Date Sat, 21 Jan 2017 00:16:43 GMT
KAFKA-3209: KIP-66: more single message transforms

Renames `HoistToStruct` SMT to `HoistField`.

Adds the following SMTs:
`ExtractField`
`MaskField`
`RegexRouter`
`ReplaceField`
`SetSchemaMetadata`
`ValueToKey`

Adds HTML doc generation and updates to `connect.html`.

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2374 from shikhar/more-smt


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c3f923cb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c3f923cb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c3f923cb

Branch: refs/heads/0.10.2
Commit: c3f923cbf9f686bf42adda44d34a975167a13159
Parents: 0523736
Author: Shikhar Bhushan <shikhar@confluent.io>
Authored: Fri Jan 20 16:15:15 2017 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Jan 20 16:16:28 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |   9 +-
 .../kafka/connect/tools/TransformationDoc.java  |  87 +++++++
 .../kafka/connect/transforms/ExtractField.java  | 114 +++++++++
 .../kafka/connect/transforms/HoistField.java    | 127 ++++++++++
 .../kafka/connect/transforms/HoistToStruct.java | 127 ----------
 .../kafka/connect/transforms/InsertField.java   | 170 ++++++--------
 .../kafka/connect/transforms/MaskField.java     | 172 ++++++++++++++
 .../kafka/connect/transforms/RegexRouter.java   |  75 ++++++
 .../kafka/connect/transforms/ReplaceField.java  | 230 +++++++++++++++++++
 .../connect/transforms/SetSchemaMetadata.java   | 124 ++++++++++
 .../connect/transforms/TimestampRouter.java     |  30 +--
 .../kafka/connect/transforms/ValueToKey.java    | 111 +++++++++
 .../transforms/util/NonEmptyListValidator.java  |  39 ++++
 .../connect/transforms/util/RegexValidator.java |  41 ++++
 .../connect/transforms/util/Requirements.java   |  61 +++++
 .../connect/transforms/util/SchemaUtil.java     |  40 ++++
 .../connect/transforms/ExtractFieldTest.java    |  59 +++++
 .../connect/transforms/HoistFieldTest.java      |  57 +++++
 .../connect/transforms/HoistToStructTest.java   |  44 ----
 .../kafka/connect/transforms/MaskFieldTest.java | 156 +++++++++++++
 .../connect/transforms/RegexRouterTest.java     |  70 ++++++
 .../connect/transforms/ReplaceFieldTest.java    |  92 ++++++++
 .../transforms/SetSchemaMetadataTest.java       |  67 ++++++
 .../connect/transforms/ValueToKeyTest.java      |  87 +++++++
 docs/connect.html                               |  16 ++
 .../tests/connect/connect_distributed_test.py   |   2 +-
 26 files changed, 1924 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6f4d250..e191a51 100644
--- a/build.gradle
+++ b/build.gradle
@@ -508,7 +508,7 @@ project(':core') {
 
   task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
                                'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
-                               'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs',
+                               'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
                                ':streams:genStreamsConfigDocs'], type: Tar) {
     classifier = 'site-docs'
     compression = Compression.GZIP
@@ -948,6 +948,13 @@ project(':connect:runtime') {
     if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
     standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream()
   }
+
+  task genConnectTransformationDocs(type: JavaExec) {
+    classpath = sourceSets.main.runtimeClasspath
+    main = 'org.apache.kafka.connect.tools.TransformationDoc'
+    if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+    standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream()
+  }
 }
 
 project(':connect:file') {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
new file mode 100644
index 0000000..6746042
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.transforms.ExtractField;
+import org.apache.kafka.connect.transforms.HoistField;
+import org.apache.kafka.connect.transforms.InsertField;
+import org.apache.kafka.connect.transforms.MaskField;
+import org.apache.kafka.connect.transforms.RegexRouter;
+import org.apache.kafka.connect.transforms.ReplaceField;
+import org.apache.kafka.connect.transforms.SetSchemaMetadata;
+import org.apache.kafka.connect.transforms.TimestampRouter;
+import org.apache.kafka.connect.transforms.ValueToKey;
+
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class TransformationDoc {
+
+    private static final class DocInfo {
+        final String transformationName;
+        final String overview;
+        final ConfigDef configDef;
+
+        private DocInfo(String transformationName, String overview, ConfigDef configDef) {
+            this.transformationName = transformationName;
+            this.overview = overview;
+            this.configDef = configDef;
+        }
+    }
+
+    private static final List<DocInfo> TRANSFORMATIONS = Arrays.asList(
+            new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF),
+            new DocInfo(ReplaceField.class.getName(), ReplaceField.OVERVIEW_DOC, ReplaceField.CONFIG_DEF),
+            new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF),
+            new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF),
+            new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF),
+            new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF),
+            new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF),
+            new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF),
+            new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF)
+    );
+
+    private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {
+        out.println("<div id=\"" + docInfo.transformationName + "\">");
+
+        out.print("<h5>");
+        out.print(docInfo.transformationName);
+        out.println("</h5>");
+
+        out.println(docInfo.overview);
+
+        out.println("<p/>");
+
+        out.println(docInfo.configDef.toHtmlTable());
+
+        out.println("</div>");
+    }
+
+    private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
+        for (final DocInfo docInfo : TRANSFORMATIONS) {
+            printTransformationHtml(out, docInfo);
+        }
+    }
+
+    public static void main(String... args) throws Exception {
+        printHtml(System.out);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
new file mode 100644
index 0000000..b706313
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+
+public abstract class ExtractField<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data."
+                    + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+                    + "or value (<code>" + Value.class.getCanonicalName() + "</code>).";
+
+    private static final String FIELD_CONFIG = "field";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.");
+
+    private static final String PURPOSE = "field extraction";
+
+    private String fieldName;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        fieldName = config.getString(FIELD_CONFIG);
+    }
+
+    @Override
+    public R apply(R record) {
+        final Schema schema = operatingSchema(record);
+        if (schema == null) {
+            final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
+            return newRecord(record, null, value.get(fieldName));
+        } else {
+            final Struct value = requireStruct(operatingValue(record), PURPOSE);
+            return newRecord(record, schema.field(fieldName).schema(), value.get(fieldName));
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+    public static class Key<R extends ConnectRecord<R>> extends ExtractField<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+    }
+
+    public static class Value<R extends ConnectRecord<R>> extends ExtractField<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
new file mode 100644
index 0000000..1f2ed7c
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Collections;
+import java.util.Map;
+
+public abstract class HoistField<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data."
+                    + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+                    + "or value (<code>" + Value.class.getCanonicalName() + "</code>).";
+
+    private static final String FIELD_CONFIG = "field";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
+                    "Field name for the single field that will be created in the resulting Struct or Map.");
+
+    private Cache<Schema, Schema> schemaUpdateCache;
+
+    private String fieldName;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        fieldName = config.getString("field");
+        schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
+    }
+
+    @Override
+    public R apply(R record) {
+        final Schema schema = operatingSchema(record);
+        final Object value = operatingValue(record);
+
+        if (schema == null) {
+            return newRecord(record, null, Collections.singletonMap(fieldName, value));
+        } else {
+            Schema updatedSchema = schemaUpdateCache.get(schema);
+            if (updatedSchema == null) {
+                updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
+                schemaUpdateCache.put(schema, updatedSchema);
+            }
+
+            final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);
+
+            return newRecord(record, updatedSchema, updatedValue);
+        }
+    }
+
+    @Override
+    public void close() {
+        schemaUpdateCache = null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+    public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+    }
+
+    public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java
deleted file mode 100644
index c2726ca..0000000
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.connect.transforms;
-
-import org.apache.kafka.common.cache.Cache;
-import org.apache.kafka.common.cache.LRUCache;
-import org.apache.kafka.common.cache.SynchronizedCache;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.transforms.util.SimpleConfig;
-
-import java.util.Map;
-
-public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Transformation<R> {
-
-    public static final String FIELD_CONFIG = "field";
-
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
-                    "Field name for the single field that will be created in the resulting Struct.");
-
-    private Cache<Schema, Schema> schemaUpdateCache;
-
-    private String fieldName;
-
-    @Override
-    public void configure(Map<String, ?> props) {
-        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
-        fieldName = config.getString("field");
-        schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
-    }
-
-    @Override
-    public R apply(R record) {
-        final Schema schema = operatingSchema(record);
-        final Object value = operatingValue(record);
-
-        Schema updatedSchema = schemaUpdateCache.get(schema);
-        if (updatedSchema == null) {
-            updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
-            schemaUpdateCache.put(schema, updatedSchema);
-        }
-
-        final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);
-
-        return newRecord(record, updatedSchema, updatedValue);
-    }
-
-    @Override
-    public void close() {
-        schemaUpdateCache = null;
-    }
-
-    @Override
-    public ConfigDef config() {
-        return CONFIG_DEF;
-    }
-
-    protected abstract Schema operatingSchema(R record);
-
-    protected abstract Object operatingValue(R record);
-
-    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
-
-    /**
-     * Wraps the record key in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
-     */
-    public static class Key<R extends ConnectRecord<R>> extends HoistToStruct<R> {
-
-        @Override
-        protected Schema operatingSchema(R record) {
-            return record.keySchema();
-        }
-
-        @Override
-        protected Object operatingValue(R record) {
-            return record.key();
-        }
-
-        @Override
-        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
-            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
-        }
-
-    }
-
-    /**
-     * Wraps the record value in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
-     */
-    public static class Value<R extends ConnectRecord<R>> extends HoistToStruct<R> {
-
-        @Override
-        protected Schema operatingSchema(R record) {
-            return record.valueSchema();
-        }
-
-        @Override
-        protected Object operatingValue(R record) {
-            return record.value();
-        }
-
-        @Override
-        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
-            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
index d67fea0..f32d6ed 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -21,23 +21,32 @@ import org.apache.kafka.common.cache.Cache;
 import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.data.Timestamp;
-import org.apache.kafka.connect.errors.DataException;
-import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
 
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireSinkRecord;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+
 public abstract class InsertField<R extends ConnectRecord<R>> implements Transformation<R> {
 
-    public interface Keys {
+    public static final String OVERVIEW_DOC =
+            "Insert field(s) using attributes from the record metadata or a configured static value."
+                    + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+                    + "or value (<code>" + Value.class.getCanonicalName() + "</code>).";
+
+    private interface ConfigName {
         String TOPIC_FIELD = "topic.field";
         String PARTITION_FIELD = "partition.field";
         String OFFSET_FIELD = "offset.field";
@@ -46,22 +55,24 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
         String STATIC_VALUE = "static.value";
     }
 
-    private static final String OPTIONALITY_DOC = "Suffix with '!' to make this a required field, or '?' to keep it optional (the default).";
-
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(Keys.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
-                    "Field name for Kafka topic.\n" + OPTIONALITY_DOC)
-            .define(Keys.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
-                    "Field name for Kafka partition.\n" + OPTIONALITY_DOC)
-            .define(Keys.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
-                    "Field name for Kafka offset - only applicable to sink connectors.\n" + OPTIONALITY_DOC)
-            .define(Keys.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
-                    "Field name for record timestamp.\n" + OPTIONALITY_DOC)
-            .define(Keys.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
-                    "Field name for static data field.\n" + OPTIONALITY_DOC)
-            .define(Keys.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+    private static final String OPTIONALITY_DOC = "Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ConfigName.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for Kafka topic. " + OPTIONALITY_DOC)
+            .define(ConfigName.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for Kafka partition. " + OPTIONALITY_DOC)
+            .define(ConfigName.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for Kafka offset - only applicable to sink connectors.<br/>" + OPTIONALITY_DOC)
+            .define(ConfigName.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for record timestamp. " + OPTIONALITY_DOC)
+            .define(ConfigName.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Field name for static data field. " + OPTIONALITY_DOC)
+            .define(ConfigName.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
                     "Static field value, if field name configured.");
 
+    private static final String PURPOSE = "field insertion";
+
     private static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().build();
 
     private static final class InsertionSpec {
@@ -91,46 +102,42 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
     private InsertionSpec timestampField;
     private InsertionSpec staticField;
     private String staticValue;
-    private boolean applicable;
 
     private Cache<Schema, Schema> schemaUpdateCache;
 
     @Override
     public void configure(Map<String, ?> props) {
         final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
-        topicField = InsertionSpec.parse(config.getString(Keys.TOPIC_FIELD));
-        partitionField = InsertionSpec.parse(config.getString(Keys.PARTITION_FIELD));
-        offsetField = InsertionSpec.parse(config.getString(Keys.OFFSET_FIELD));
-        timestampField = InsertionSpec.parse(config.getString(Keys.TIMESTAMP_FIELD));
-        staticField = InsertionSpec.parse(config.getString(Keys.STATIC_FIELD));
-        staticValue = config.getString(Keys.STATIC_VALUE);
-        applicable = topicField != null || partitionField != null || offsetField != null || timestampField != null;
+        topicField = InsertionSpec.parse(config.getString(ConfigName.TOPIC_FIELD));
+        partitionField = InsertionSpec.parse(config.getString(ConfigName.PARTITION_FIELD));
+        offsetField = InsertionSpec.parse(config.getString(ConfigName.OFFSET_FIELD));
+        timestampField = InsertionSpec.parse(config.getString(ConfigName.TIMESTAMP_FIELD));
+        staticField = InsertionSpec.parse(config.getString(ConfigName.STATIC_FIELD));
+        staticValue = config.getString(ConfigName.STATIC_VALUE);
+
+        if (topicField == null && partitionField == null && offsetField == null && timestampField == null && staticField == null) {
+            throw new ConfigException("No field insertion configured");
+        }
+
+        if (staticField != null && staticValue == null) {
+            throw new ConfigException(ConfigName.STATIC_VALUE, null, "No value specified for static field: " + staticField);
+        }
 
         schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
     }
 
     @Override
     public R apply(R record) {
-        if (!applicable) return record;
-
-        final Schema schema = operatingSchema(record);
-        final Object value = operatingValue(record);
-
-        if (value == null)
-            throw new DataException("null value");
-
-        if (schema == null) {
-            if (!(value instanceof Map))
-                throw new DataException("Can only operate on Map value in schemaless mode: " + value.getClass().getName());
-            return applySchemaless(record, (Map<String, Object>) value);
+        if (operatingSchema(record) == null) {
+            return applySchemaless(record);
         } else {
-            if (schema.type() != Schema.Type.STRUCT)
-                throw new DataException("Can only operate on Struct types: " + value.getClass().getName());
-            return applyWithSchema(record, schema, (Struct) value);
+            return applyWithSchema(record);
         }
     }
 
-    private R applySchemaless(R record, Map<String, Object> value) {
+    private R applySchemaless(R record) {
+        final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
+
         final Map<String, Object> updatedValue = new HashMap<>(value);
 
         if (topicField != null) {
@@ -140,9 +147,7 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
             updatedValue.put(partitionField.name, record.kafkaPartition());
         }
         if (offsetField != null) {
-            if (!(record instanceof SinkRecord))
-                throw new DataException("Offset insertion is only supported for sink connectors, record is of type: " + record.getClass());
-            updatedValue.put(offsetField.name, ((SinkRecord) record).kafkaOffset());
+            updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset());
         }
         if (timestampField != null && record.timestamp() != null) {
             updatedValue.put(timestampField.name, record.timestamp());
@@ -150,36 +155,46 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
         if (staticField != null && staticValue != null) {
             updatedValue.put(staticField.name, staticValue);
         }
+
         return newRecord(record, null, updatedValue);
     }
 
-    private R applyWithSchema(R record, Schema schema, Struct value) {
-        Schema updatedSchema = schemaUpdateCache.get(schema);
+    private R applyWithSchema(R record) {
+        final Struct value = requireStruct(operatingValue(record), PURPOSE);
+
+        Schema updatedSchema = schemaUpdateCache.get(value.schema());
         if (updatedSchema == null) {
-            updatedSchema = makeUpdatedSchema(schema);
-            schemaUpdateCache.put(schema, updatedSchema);
+            updatedSchema = makeUpdatedSchema(value.schema());
+            schemaUpdateCache.put(value.schema(), updatedSchema);
         }
 
         final Struct updatedValue = new Struct(updatedSchema);
 
-        copyFields(value, updatedValue);
+        for (Field field : value.schema().fields()) {
+            updatedValue.put(field.name(), value.get(field));
+        }
 
-        insertFields(record, updatedValue);
+        if (topicField != null) {
+            updatedValue.put(topicField.name, record.topic());
+        }
+        if (partitionField != null && record.kafkaPartition() != null) {
+            updatedValue.put(partitionField.name, record.kafkaPartition());
+        }
+        if (offsetField != null) {
+            updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset());
+        }
+        if (timestampField != null && record.timestamp() != null) {
+            updatedValue.put(timestampField.name, new Date(record.timestamp()));
+        }
+        if (staticField != null && staticValue != null) {
+            updatedValue.put(staticField.name, staticValue);
+        }
 
         return newRecord(record, updatedSchema, updatedValue);
     }
 
     private Schema makeUpdatedSchema(Schema schema) {
-        final SchemaBuilder builder = SchemaBuilder.struct();
-
-        builder.name(schema.name());
-        builder.version(schema.version());
-        builder.doc(schema.doc());
-
-        final Map<String, String> params = schema.parameters();
-        if (params != null) {
-            builder.parameters(params);
-        }
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
 
         for (Field field : schema.fields()) {
             builder.field(field.name(), field.schema());
@@ -204,33 +219,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
         return builder.build();
     }
 
-    private void copyFields(Struct value, Struct updatedValue) {
-        for (Field field : value.schema().fields()) {
-            updatedValue.put(field.name(), value.get(field));
-        }
-    }
-
-    private void insertFields(R record, Struct value) {
-        if (topicField != null) {
-            value.put(topicField.name, record.topic());
-        }
-        if (partitionField != null && record.kafkaPartition() != null) {
-            value.put(partitionField.name, record.kafkaPartition());
-        }
-        if (offsetField != null) {
-            if (!(record instanceof SinkRecord)) {
-                throw new DataException("Offset insertion is only supported for sink connectors, record is of type: " + record.getClass());
-            }
-            value.put(offsetField.name, ((SinkRecord) record).kafkaOffset());
-        }
-        if (timestampField != null && record.timestamp() != null) {
-            value.put(timestampField.name, new Date(record.timestamp()));
-        }
-        if (staticField != null && staticValue != null) {
-            value.put(staticField.name, staticValue);
-        }
-    }
-
     @Override
     public void close() {
         schemaUpdateCache = null;
@@ -247,10 +235,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
 
     protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
 
-    /**
-     * This transformation allows inserting configured attributes of the record metadata as fields in the record key.
-     * It also allows adding a static data field.
-     */
     public static class Key<R extends ConnectRecord<R>> extends InsertField<R> {
 
         @Override
@@ -270,10 +254,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
 
     }
 
-    /**
-     * This transformation allows inserting configured attributes of the record metadata as fields in the record value.
-     * It also allows adding a static data field.
-     */
     public static class Value<R extends ConnectRecord<R>> extends InsertField<R> {
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
new file mode 100644
index 0000000..d7ef2aa
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+
+public abstract class MaskField<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on)."
+                    + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+                    + "or value (<code>" + Value.class.getCanonicalName() + "</code>).";
+
+    private static final String FIELDS_CONFIG = "fields";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Names of fields to mask.");
+
+    private static final String PURPOSE = "mask fields";
+
+    private static final Map<Class<?>, Object> PRIMITIVE_VALUE_MAPPING = new HashMap<>();
+
+    static {
+        PRIMITIVE_VALUE_MAPPING.put(Boolean.class, Boolean.FALSE);
+        PRIMITIVE_VALUE_MAPPING.put(Byte.class, (byte) 0);
+        PRIMITIVE_VALUE_MAPPING.put(Short.class, (short) 0);
+        PRIMITIVE_VALUE_MAPPING.put(Integer.class, 0);
+        PRIMITIVE_VALUE_MAPPING.put(Long.class, 0L);
+        PRIMITIVE_VALUE_MAPPING.put(Float.class, 0f);
+        PRIMITIVE_VALUE_MAPPING.put(Double.class, 0d);
+        PRIMITIVE_VALUE_MAPPING.put(BigInteger.class, BigInteger.ZERO);
+        PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class, BigDecimal.ZERO);
+        PRIMITIVE_VALUE_MAPPING.put(Date.class, new Date(0));
+        PRIMITIVE_VALUE_MAPPING.put(String.class, "");
+    }
+
+    private Set<String> maskedFields;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        maskedFields = new HashSet<>(config.getList(FIELDS_CONFIG));
+    }
+
+    @Override
+    public R apply(R record) {
+        if (operatingSchema(record) == null) {
+            return applySchemaless(record);
+        } else {
+            return applyWithSchema(record);
+        }
+    }
+
+    private R applySchemaless(R record) {
+        final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
+        final HashMap<String, Object> updatedValue = new HashMap<>(value);
+        for (String field : maskedFields) {
+            updatedValue.put(field, masked(value.get(field)));
+        }
+        return newRecord(record, updatedValue);
+    }
+
+    private R applyWithSchema(R record) {
+        final Struct value = requireStruct(operatingValue(record), PURPOSE);
+        final Struct updatedValue = new Struct(value.schema());
+        for (Field field : value.schema().fields()) {
+            final Object origFieldValue = value.get(field);
+            updatedValue.put(field, maskedFields.contains(field.name()) ? masked(origFieldValue) : origFieldValue);
+        }
+        return newRecord(record, updatedValue);
+    }
+
+    private static Object masked(Object value) {
+        if (value == null)
+            return null;
+        Object maskedValue = PRIMITIVE_VALUE_MAPPING.get(value.getClass());
+        if (maskedValue == null) {
+            if (value instanceof List)
+                maskedValue = Collections.emptyList();
+            else if (value instanceof Map)
+                maskedValue = Collections.emptyMap();
+            else
+                throw new DataException("Cannot mask value of type: " + value.getClass());
+        }
+        return maskedValue;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R base, Object value);
+
+    public static final class Key<R extends ConnectRecord<R>> extends MaskField<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+    }
+
+    public static final class Value<R extends ConnectRecord<R>> extends MaskField<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), updatedValue, record.timestamp());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
new file mode 100644
index 0000000..f16560e
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC = "Update the record topic using the configured regular expression and replacement string."
+            + "<p/>Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. "
+            + "If the pattern matches the input topic, <code>java.util.regex.Matcher#replaceFirst()</code> is used with the replacement string to obtain the new topic.";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH,
+                    "Regular expression to use for matching.")
+            .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
+                    "Replacement string.");
+
+    private interface ConfigName {
+        String REGEX = "regex";
+        String REPLACEMENT = "replacement";
+    }
+
+    private Pattern regex;
+    private String replacement;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        regex = Pattern.compile(config.getString(ConfigName.REGEX));
+        replacement = config.getString(ConfigName.REPLACEMENT);
+    }
+
+    @Override
+    public R apply(R record) {
+        final Matcher matcher = regex.matcher(record.topic());
+        if (matcher.matches()) {
+            final String topic = matcher.replaceFirst(replacement);
+            return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
+        }
+        return record;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
new file mode 100644
index 0000000..6faf842
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+
+public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC = "Filter or rename fields.";
+
+    interface ConfigName {
+        String BLACKLIST = "blacklist";
+        String WHITELIST = "whitelist";
+        String RENAME = "renames";
+    }
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
+                    "Fields to exclude. This takes precedence over the whitelist.")
+            .define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
+                    "Fields to include. If specified, only these fields will be used.")
+            .define(ConfigName.RENAME, ConfigDef.Type.LIST, Collections.emptyList(), new ConfigDef.Validator() {
+                @Override
+                public void ensureValid(String name, Object value) {
+                    parseRenameMappings((List<String>) value);
+                }
+
+                @Override
+                public String toString() {
+                    return "list of colon-delimited pairs, e.g. <code>foo:bar,abc:xyz</code>";
+                }
+            }, ConfigDef.Importance.MEDIUM, "Field rename mappings.");
+
+    private static final String PURPOSE = "field replacement";
+
+    private List<String> blacklist;
+    private List<String> whitelist;
+    private Map<String, String> renames;
+    private Map<String, String> reverseRenames;
+
+    private Cache<Schema, Schema> schemaUpdateCache;
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
+        blacklist = config.getList(ConfigName.BLACKLIST);
+        whitelist = config.getList(ConfigName.WHITELIST);
+        renames = parseRenameMappings(config.getList(ConfigName.RENAME));
+        reverseRenames = invert(renames);
+
+        schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
+    }
+
+    static Map<String, String> parseRenameMappings(List<String> mappings) {
+        final Map<String, String> m = new HashMap<>();
+        for (String mapping : mappings) {
+            final String[] parts = mapping.split(":");
+            if (parts.length != 2) {
+                throw new ConfigException(ConfigName.RENAME, mappings, "Invalid rename mapping: " + mapping);
+            }
+            m.put(parts[0], parts[1]);
+        }
+        return m;
+    }
+
+    static Map<String, String> invert(Map<String, String> source) {
+        final Map<String, String> m = new HashMap<>();
+        for (Map.Entry<String, String> e : source.entrySet()) {
+            m.put(e.getValue(), e.getKey());
+        }
+        return m;
+    }
+
+    boolean filter(String fieldName) {
+        return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName));
+    }
+
+    String renamed(String fieldName) {
+        final String mapping = renames.get(fieldName);
+        return mapping == null ? fieldName : mapping;
+    }
+
+    String reverseRenamed(String fieldName) {
+        final String mapping = reverseRenames.get(fieldName);
+        return mapping == null ? fieldName : mapping;
+    }
+
+    @Override
+    public R apply(R record) {
+        if (operatingSchema(record) == null) {
+            return applySchemaless(record);
+        } else {
+            return applyWithSchema(record);
+        }
+    }
+
+    private R applySchemaless(R record) {
+        final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
+
+        final Map<String, Object> updatedValue = new HashMap<>(value.size());
+
+        for (Map.Entry<String, Object> e : value.entrySet()) {
+            final String fieldName = e.getKey();
+            if (filter(fieldName)) {
+                final Object fieldValue = e.getValue();
+                updatedValue.put(renamed(fieldName), fieldValue);
+            }
+        }
+
+        return newRecord(record, null, updatedValue);
+    }
+
+    private R applyWithSchema(R record) {
+        final Struct value = requireStruct(operatingValue(record), PURPOSE);
+
+        Schema updatedSchema = schemaUpdateCache.get(value.schema());
+        if (updatedSchema == null) {
+            updatedSchema = makeUpdatedSchema(value.schema());
+            schemaUpdateCache.put(value.schema(), updatedSchema);
+        }
+
+        final Struct updatedValue = new Struct(updatedSchema);
+
+        for (Field field : updatedSchema.fields()) {
+            final Object fieldValue = value.get(reverseRenamed(field.name()));
+            updatedValue.put(field.name(), fieldValue);
+        }
+
+        return newRecord(record, updatedSchema, updatedValue);
+    }
+
+    private Schema makeUpdatedSchema(Schema schema) {
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
+        for (Field field : schema.fields()) {
+            if (filter(field.name())) {
+                builder.field(renamed(field.name()), field.schema());
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+        schemaUpdateCache = null;
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+    public static class Key<R extends ConnectRecord<R>> extends ReplaceField<R> {
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+
+    }
+
+    public static class Value<R extends ConnectRecord<R>> extends ReplaceField<R> {
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
new file mode 100644
index 0000000..f3076b4
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.ConnectSchema;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema;
+
+public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Set the schema name, version or both on the record's key (<code>" + Key.class.getCanonicalName() + "</code>)"
+                    + " or value (<code>" + Value.class.getCanonicalName() + "</code>) schema.";
+
+    private interface ConfigName {
+        String SCHEMA_NAME = "schema.name";
+        String SCHEMA_VERSION = "schema.version";
+    }
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ConfigName.SCHEMA_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Schema name to set.")
+            .define(ConfigName.SCHEMA_VERSION, ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "Schema version to set.");
+
+    private String schemaName;
+    private Integer schemaVersion;
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
+        schemaName = config.getString(ConfigName.SCHEMA_NAME);
+        schemaVersion = config.getInt(ConfigName.SCHEMA_VERSION);
+
+        if (schemaName == null && schemaVersion == null) {
+            throw new ConfigException("Neither schema name nor version configured");
+        }
+    }
+
+    @Override
+    public R apply(R record) {
+        final Schema schema = operatingSchema(record);
+        requireSchema(schema, "updating schema metadata");
+        final boolean isArray = schema.type() == Schema.Type.ARRAY;
+        final boolean isMap = schema.type() == Schema.Type.MAP;
+        final Schema updatedSchema = new ConnectSchema(
+                schema.type(),
+                schema.isOptional(),
+                schema.defaultValue(),
+                schemaName != null ? schemaName : schema.name(),
+                schemaVersion != null ? schemaVersion : schema.version(),
+                schema.doc(),
+                schema.parameters(),
+                schema.fields(),
+                isMap ? schema.keySchema() : null,
+                isMap || isArray ? schema.valueSchema() : null
+        );
+        return newRecord(record, updatedSchema);
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema);
+
+    /**
+     * Set the schema name, version or both on the record's key schema.
+     */
+    public static class Key<R extends ConnectRecord<R>> extends SetSchemaMetadata<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, record.key(), record.valueSchema(), record.value(), record.timestamp());
+        }
+    }
+
+    /**
+     * Set the schema name, version or both on the record's value schema.
+     */
+    public static class Value<R extends ConnectRecord<R>> extends SetSchemaMetadata<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
index 1dd5345..f917a8d 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
@@ -27,25 +27,25 @@ import java.util.Date;
 import java.util.Map;
 import java.util.TimeZone;
 
-/**
- * This transformation facilitates updating the record's topic field as a function of the original topic value and the record timestamp.
- * <p/>
- * It is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system
- * (e.g. database table or search index name).
- */
 public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R> {
 
-    public interface Keys {
+    public static final String OVERVIEW_DOC =
+            "Update the record's topic field as a function of the original topic value and the record timestamp."
+                    + "<p/>"
+                    + "This is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system"
+                    + "(e.g. database table or search index name).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ConfigName.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH,
+                    "Format string which can contain <code>${topic}</code> and <code>${timestamp}</code> as placeholders for the topic and timestamp, respectively.")
+            .define(ConfigName.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH,
+                    "Format string for the timestamp that is compatible with <code>java.text.SimpleDateFormat</code>.");
+
+    private interface ConfigName {
         String TOPIC_FORMAT = "topic.format";
         String TIMESTAMP_FORMAT = "timestamp.format";
     }
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(Keys.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH,
-                    "Format string which can contain ``${topic}`` and ``${timestamp}`` as placeholders for the topic and timestamp, respectively.")
-            .define(Keys.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH,
-                    "Format string for the timestamp that is compatible with java.text.SimpleDateFormat.");
-
     private String topicFormat;
     private ThreadLocal<SimpleDateFormat> timestampFormat;
 
@@ -53,9 +53,9 @@ public class TimestampRouter<R extends ConnectRecord<R>> implements Transformati
     public void configure(Map<String, ?> props) {
         final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
 
-        topicFormat = config.getString(Keys.TOPIC_FORMAT);
+        topicFormat = config.getString(ConfigName.TOPIC_FORMAT);
 
-        final String timestampFormatStr = config.getString(Keys.TIMESTAMP_FORMAT);
+        final String timestampFormatStr = config.getString(ConfigName.TIMESTAMP_FORMAT);
         timestampFormat = new ThreadLocal<SimpleDateFormat>() {
             @Override
             protected SimpleDateFormat initialValue() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
new file mode 100644
index 0000000..504da54
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+
+public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value.";
+
+    public static final String FIELDS_CONFIG = "fields";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH,
+                    "Field names on the record value to extract as the record key.");
+
+    private static final String PURPOSE = "copying fields from value to key";
+
+    private List<String> fields;
+
+    private Cache<Schema, Schema> valueToKeySchemaCache;
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
+        fields = config.getList(FIELDS_CONFIG);
+        valueToKeySchemaCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
+    }
+
+    @Override
+    public R apply(R record) {
+        if (record.valueSchema() == null) {
+            return applySchemaless(record);
+        } else {
+            return applyWithSchema(record);
+        }
+    }
+
+    private R applySchemaless(R record) {
+        final Map<String, Object> value = requireMap(record.value(), PURPOSE);
+        final Map<String, Object> key = new HashMap<>(fields.size());
+        for (String field : fields) {
+            key.put(field, value.get(field));
+        }
+        return record.newRecord(record.topic(), record.kafkaPartition(), null, key, record.valueSchema(), record.value(), record.timestamp());
+    }
+
+    private R applyWithSchema(R record) {
+        final Struct value = requireStruct(record.value(), PURPOSE);
+
+        Schema keySchema = valueToKeySchemaCache.get(value.schema());
+        if (keySchema == null) {
+            final SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
+            for (String field : fields) {
+                final Schema fieldSchema = value.schema().field(field).schema();
+                keySchemaBuilder.field(field, fieldSchema);
+            }
+            keySchema = keySchemaBuilder.build();
+            valueToKeySchemaCache.put(value.schema(), keySchema);
+        }
+
+        final Struct key = new Struct(keySchema);
+        for (String field : fields) {
+            key.put(field, value.get(field));
+        }
+
+        return record.newRecord(record.topic(), record.kafkaPartition(), keySchema, key, value.schema(), value, record.timestamp());
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+        valueToKeySchemaCache = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java
new file mode 100644
index 0000000..1abbbc8
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.List;
+
+public class NonEmptyListValidator implements ConfigDef.Validator {
+
+    @Override
+    public void ensureValid(String name, Object value) {
+        if (((List) value).isEmpty()) {
+            throw new ConfigException(name, value, "Empty list");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "non-empty list";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java
new file mode 100644
index 0000000..9713b27
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.regex.Pattern;
+
+public class RegexValidator implements ConfigDef.Validator {
+
+    @Override
+    public void ensureValid(String name, Object value) {
+        try {
+            Pattern.compile((String) value);
+        } catch (Exception e) {
+            throw new ConfigException(name, value, "Invalid regex: " + e.getMessage());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "valid regex";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
new file mode 100644
index 0000000..b004f8a
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms.util;
+
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Map;
+
+public class Requirements {
+
+    public static void requireSchema(Schema schema, String purpose) {
+        if (schema == null) {
+            throw new DataException("Schema required for [" + purpose + "]");
+        }
+    }
+
+    public static Map<String, Object> requireMap(Object value, String purpose) {
+        if (!(value instanceof Map)) {
+            throw new DataException("Only Map objects supported in absence of schema for [" + purpose + "], found: " + nullSafeClassName(value));
+        }
+        return (Map<String, Object>) value;
+    }
+
+    public static Struct requireStruct(Object value, String purpose) {
+        if (!(value instanceof Struct)) {
+            throw new DataException("Only Struct objects supported for [" + purpose + "], found: " + nullSafeClassName(value));
+        }
+        return (Struct) value;
+    }
+
+    public static SinkRecord requireSinkRecord(ConnectRecord<?> record, String purpose) {
+        if (!(record instanceof SinkRecord)) {
+            throw new DataException("Only SinkRecord supported for [" + purpose + "], found: " + nullSafeClassName(record));
+        }
+        return (SinkRecord) record;
+    }
+
+    private static String nullSafeClassName(Object x) {
+        return x == null ? "null" : x.getClass().getCanonicalName();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java
new file mode 100644
index 0000000..da261e7
--- /dev/null
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms.util;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+
+import java.util.Map;
+
+public class SchemaUtil {
+
+    public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) {
+        builder.name(source.name());
+        builder.version(source.version());
+        builder.doc(source.doc());
+
+        final Map<String, String> params = source.parameters();
+        if (params != null) {
+            builder.parameters(params);
+        }
+
+        return builder;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
new file mode 100644
index 0000000..d721795
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ExtractFieldTest {
+
+    @Test
+    public void schemaless() {
+        final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
+        xform.configure(Collections.singletonMap("field", "magic"));
+
+        final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.keySchema());
+        assertEquals(42, transformedRecord.key());
+    }
+
+    @Test
+    public void withSchema() {
+        final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
+        xform.configure(Collections.singletonMap("field", "magic"));
+
+        final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build();
+        final Struct key = new Struct(keySchema).put("magic", 42);
+        final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertEquals(Schema.INT32_SCHEMA, transformedRecord.keySchema());
+        assertEquals(42, transformedRecord.key());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
new file mode 100644
index 0000000..b5f9d93
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class HoistFieldTest {
+
+    @Test
+    public void schemaless() {
+        final HoistField<SinkRecord> xform = new HoistField.Key<>();
+        xform.configure(Collections.singletonMap("field", "magic"));
+
+        final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.keySchema());
+        assertEquals(Collections.singletonMap("magic", 42), transformedRecord.key());
+    }
+
+    @Test
+    public void withSchema() {
+        final HoistField<SinkRecord> xform = new HoistField.Key<>();
+        xform.configure(Collections.singletonMap("field", "magic"));
+
+        final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertEquals(Schema.Type.STRUCT, transformedRecord.keySchema().type());
+        assertEquals(record.keySchema(),  transformedRecord.keySchema().field("magic").schema());
+        assertEquals(42, ((Struct) transformedRecord.key()).get("magic"));
+    }
+
+}


Mime
View raw message