kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-3209: KIP-66: more single message transforms
Date Sat, 21 Jan 2017 00:16:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 05237367d -> c3f923cbf


http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java
deleted file mode 100644
index 99a6e99..0000000
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.connect.transforms;
-
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-
-public class HoistToStructTest {
-
-    @Test
-    public void sanityCheck() {
-        final HoistToStruct<SinkRecord> xform = new HoistToStruct.Key<>();
-        xform.configure(Collections.singletonMap("field", "magic"));
-
-        final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null,
null, 0);
-        final SinkRecord transformedRecord = xform.apply(record);
-
-        assertEquals(Schema.Type.STRUCT, transformedRecord.keySchema().type());
-        assertEquals(record.keySchema(),  transformedRecord.keySchema().field("magic").schema());
-        assertEquals(42, ((Struct) transformedRecord.key()).get("magic"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
new file mode 100644
index 0000000..c96058a
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class MaskFieldTest {
+
+    private static MaskField<SinkRecord> transform(List<String> fields) {
+        final MaskField<SinkRecord> xform = new MaskField.Value<>();
+        xform.configure(Collections.singletonMap("fields", fields));
+        return xform;
+    }
+
+    private static SinkRecord record(Schema schema, Object value) {
+        return new SinkRecord("", 0, null, null, schema, value, 0);
+    }
+
+    @Test
+    public void schemaless() {
+        final Map<String, Object> value = new HashMap<>();
+        value.put("magic", 42);
+        value.put("bool", true);
+        value.put("byte", (byte) 42);
+        value.put("short", (short) 42);
+        value.put("int", 42);
+        value.put("long", 42L);
+        value.put("float", 42f);
+        value.put("double", 42d);
+        value.put("string", "blabla");
+        value.put("date", new Date());
+        value.put("bigint", new BigInteger("42"));
+        value.put("bigdec", new BigDecimal("42.0"));
+        value.put("list", Collections.singletonList(42));
+        value.put("map", Collections.singletonMap("key", "value"));
+
+        final List<String> maskFields = new ArrayList<>(value.keySet());
+        maskFields.remove("magic");
+
+        final Map<String, Object> updatedValue = (Map) transform(maskFields).apply(record(null,
value)).value();
+
+        assertEquals(42, updatedValue.get("magic"));
+        assertEquals(false, updatedValue.get("bool"));
+        assertEquals((byte) 0, updatedValue.get("byte"));
+        assertEquals((short) 0, updatedValue.get("short"));
+        assertEquals(0, updatedValue.get("int"));
+        assertEquals(0L, updatedValue.get("long"));
+        assertEquals(0f, updatedValue.get("float"));
+        assertEquals(0d, updatedValue.get("double"));
+        assertEquals("", updatedValue.get("string"));
+        assertEquals(new Date(0), updatedValue.get("date"));
+        assertEquals(BigInteger.ZERO, updatedValue.get("bigint"));
+        assertEquals(BigDecimal.ZERO, updatedValue.get("bigdec"));
+        assertEquals(Collections.emptyList(), updatedValue.get("list"));
+        assertEquals(Collections.emptyMap(), updatedValue.get("map"));
+    }
+
+    @Test
+    public void withSchema() {
+        Schema schema = SchemaBuilder.struct()
+                .field("magic", Schema.INT32_SCHEMA)
+                .field("bool", Schema.BOOLEAN_SCHEMA)
+                .field("byte", Schema.INT8_SCHEMA)
+                .field("short", Schema.INT16_SCHEMA)
+                .field("int", Schema.INT32_SCHEMA)
+                .field("long", Schema.INT64_SCHEMA)
+                .field("float", Schema.FLOAT32_SCHEMA)
+                .field("double", Schema.FLOAT64_SCHEMA)
+                .field("string", Schema.STRING_SCHEMA)
+                .field("date", org.apache.kafka.connect.data.Date.SCHEMA)
+                .field("time", Time.SCHEMA)
+                .field("timestamp", Timestamp.SCHEMA)
+                .field("decimal", Decimal.schema(0))
+                .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA))
+                .field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
+                .build();
+
+        final Struct value = new Struct(schema);
+        value.put("magic", 42);
+        value.put("bool", true);
+        value.put("byte", (byte) 42);
+        value.put("short", (short) 42);
+        value.put("int", 42);
+        value.put("long", 42L);
+        value.put("float", 42f);
+        value.put("double", 42d);
+        value.put("string", "hmm");
+        value.put("date", new Date());
+        value.put("time", new Date());
+        value.put("timestamp", new Date());
+        value.put("decimal", new BigDecimal(42));
+        value.put("array", Arrays.asList(1, 2, 3));
+        value.put("map", Collections.singletonMap("what", "what"));
+
+        final List<String> maskFields = new ArrayList<>(schema.fields().size());
+        for (Field field: schema.fields()) {
+            if (!field.name().equals("magic")) {
+                maskFields.add(field.name());
+            }
+        }
+
+        final Struct updatedValue = (Struct) transform(maskFields).apply(record(schema, value)).value();
+
+        assertEquals(42, updatedValue.get("magic"));
+        assertEquals(false, updatedValue.get("bool"));
+        assertEquals((byte) 0, updatedValue.get("byte"));
+        assertEquals((short) 0, updatedValue.get("short"));
+        assertEquals(0, updatedValue.get("int"));
+        assertEquals(0L, updatedValue.get("long"));
+        assertEquals(0f, updatedValue.get("float"));
+        assertEquals(0d, updatedValue.get("double"));
+        assertEquals("", updatedValue.get("string"));
+        assertEquals(new Date(0), updatedValue.get("date"));
+        assertEquals(new Date(0), updatedValue.get("time"));
+        assertEquals(new Date(0), updatedValue.get("timestamp"));
+        assertEquals(BigDecimal.ZERO, updatedValue.get("decimal"));
+        assertEquals(Collections.emptyList(), updatedValue.get("array"));
+        assertEquals(Collections.emptyMap(), updatedValue.get("map"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
new file mode 100644
index 0000000..c599265
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class RegexRouterTest {
+
+    private static String apply(String regex, String replacement, String topic) {
+        final Map<String, String> props = new HashMap<>();
+        props.put("regex", regex);
+        props.put("replacement", replacement);
+        final RegexRouter<SinkRecord> router = new RegexRouter<>();
+        router.configure(props);
+        return router.apply(new SinkRecord(topic, 0, null, null, null, null, 0))
+                .topic();
+    }
+
+    @Test
+    public void staticReplacement() {
+        assertEquals("bar", apply("foo", "bar", "foo"));
+    }
+
+    @Test
+    public void doesntMatch() {
+        assertEquals("orig", apply("foo", "bar", "orig"));
+    }
+
+    @Test
+    public void identity() {
+        assertEquals("orig", apply("(.*)", "$1", "orig"));
+    }
+
+    @Test
+    public void addPrefix() {
+        assertEquals("prefix-orig", apply("(.*)", "prefix-$1", "orig"));
+    }
+
+    @Test
+    public void addSuffix() {
+        assertEquals("orig-suffix", apply("(.*)", "$1-suffix", "orig"));
+    }
+
+    @Test
+    public void slice() {
+        assertEquals("index", apply("(.*)-(\\d\\d\\d\\d\\d\\d\\d\\d)", "$1", "index-20160117"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
new file mode 100644
index 0000000..9f9d4b7
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ReplaceFieldTest {
+
+    @Test
+    public void schemaless() {
+        final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("blacklist", "dont");
+        props.put("renames", "abc:xyz,foo:bar");
+
+        xform.configure(props);
+
+        final Map<String, Object> value = new HashMap<>();
+        value.put("dont", "whatever");
+        value.put("abc", 42);
+        value.put("foo", true);
+        value.put("etc", "etc");
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, null, value, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        final Map updatedValue = (Map) transformedRecord.value();
+        assertEquals(3, updatedValue.size());
+        assertEquals(42, updatedValue.get("xyz"));
+        assertEquals(true, updatedValue.get("bar"));
+        assertEquals("etc", updatedValue.get("etc"));
+    }
+
+    @Test
+    public void withSchema() {
+        final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("whitelist", "abc,foo");
+        props.put("renames", "abc:xyz,foo:bar");
+
+        xform.configure(props);
+
+        final Schema schema = SchemaBuilder.struct()
+                .field("dont", Schema.STRING_SCHEMA)
+                .field("abc", Schema.INT32_SCHEMA)
+                .field("foo", Schema.BOOLEAN_SCHEMA)
+                .field("etc", Schema.STRING_SCHEMA)
+                .build();
+
+        final Struct value = new Struct(schema);
+        value.put("dont", "whatever");
+        value.put("abc", 42);
+        value.put("foo", true);
+        value.put("etc", "etc");
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, schema, value, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        final Struct updatedValue = (Struct) transformedRecord.value();
+
+        assertEquals(2, updatedValue.schema().fields().size());
+        assertEquals(new Integer(42), updatedValue.getInt32("xyz"));
+        assertEquals(true, updatedValue.getBoolean("bar"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
new file mode 100644
index 0000000..2aa790f
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SetSchemaMetadataTest {
+
+    @Test
+    public void schemaNameUpdate() {
+        final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
+        xform.configure(Collections.singletonMap("schema.name", "foo"));
+        final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(),
null, 0);
+        final SinkRecord updatedRecord = xform.apply(record);
+        assertEquals("foo", updatedRecord.valueSchema().name());
+    }
+
+    @Test
+    public void schemaVersionUpdate() {
+        final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
+        xform.configure(Collections.singletonMap("schema.version", 42));
+        final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(),
null, 0);
+        final SinkRecord updatedRecord = xform.apply(record);
+        assertEquals(new Integer(42), updatedRecord.valueSchema().version());
+    }
+
+    @Test
+    public void schemaNameAndVersionUpdate() {
+        final Map<String, String> props = new HashMap<>();
+        props.put("schema.name", "foo");
+        props.put("schema.version", "42");
+
+        final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
+        xform.configure(props);
+
+        final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(),
null, 0);
+
+        final SinkRecord updatedRecord = xform.apply(record);
+
+        assertEquals("foo", updatedRecord.valueSchema().name());
+        assertEquals(new Integer(42), updatedRecord.valueSchema().version());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
new file mode 100644
index 0000000..e5328d3
--- /dev/null
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ValueToKeyTest {
+
+    @Test
+    public void schemaless() {
+        final ValueToKey<SinkRecord> xform = new ValueToKey<>();
+        xform.configure(Collections.singletonMap("fields", "a,b"));
+
+        final HashMap<String, Integer> value = new HashMap<>();
+        value.put("a", 1);
+        value.put("b", 2);
+        value.put("c", 3);
+
+        final SinkRecord record = new SinkRecord("", 0, null, null, null, value, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        final HashMap<String, Integer> expectedKey = new HashMap<>();
+        expectedKey.put("a", 1);
+        expectedKey.put("b", 2);
+
+        assertNull(transformedRecord.keySchema());
+        assertEquals(expectedKey, transformedRecord.key());
+    }
+
+    @Test
+    public void withSchema() {
+        final ValueToKey<SinkRecord> xform = new ValueToKey<>();
+        xform.configure(Collections.singletonMap("fields", "a,b"));
+
+        final Schema valueSchema = SchemaBuilder.struct()
+                .field("a", Schema.INT32_SCHEMA)
+                .field("b", Schema.INT32_SCHEMA)
+                .field("c", Schema.INT32_SCHEMA)
+                .build();
+
+        final Struct value = new Struct(valueSchema);
+        value.put("a", 1);
+        value.put("b", 2);
+        value.put("c", 3);
+
+        final SinkRecord record = new SinkRecord("", 0, null, null, valueSchema, value, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        final Schema expectedKeySchema = SchemaBuilder.struct()
+                .field("a", Schema.INT32_SCHEMA)
+                .field("b", Schema.INT32_SCHEMA)
+                .build();
+
+        final Struct expectedKey = new Struct(expectedKeySchema)
+                .put("a", 1)
+                .put("b", 2);
+
+        assertEquals(expectedKeySchema, transformedRecord.keySchema());
+        assertEquals(expectedKey, transformedRecord.key());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/docs/connect.html
----------------------------------------------------------------------
diff --git a/docs/connect.html b/docs/connect.html
index 23e168c..1af5ed9 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -100,6 +100,22 @@
 
     For any other options, you should consult the documentation for the connector.
 
+    <h4><a id="connect_transforms" href="#connect_transforms">Transformations</a></h4>
+
+    Connectors can be configured with transformations to make lightweight message-at-a-time
modifications. They can be convenient for minor data massaging and routing changes.
+
+    A transformation chain can be specified in the connector configuration.
+
+    <ul>
+        <li><code>transforms</code> - List of aliases for the transformation,
specifying the order in which the transformations will be applied.</li>
+        <li><code>transforms.$alias.type</code> - Fully qualified class
name for the transformation.</li>
+        <li><code>transforms.$alias.$transformationSpecificConfig</code>
Configuration properties for the transformation</li>
+    </ul>
+
+    Several widely-applicable data and routing transformations are included with Kafka Connect:
+
+    <!--#include virtual="generated/connect_transforms.html" -->
+
     <h4><a id="connect_rest" href="#connect_rest">REST API</a></h4>
 
     Since Kafka Connect is intended to be run as a service, it also provides a REST API for
managing connectors. By default, this service runs on port 8083. The following are the currently
supported endpoints:

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f923cb/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 198e945..c298fb1 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -460,7 +460,7 @@ class ConnectDistributedTest(Test):
             'file': self.INPUT_FILE,
             'topic': self.TOPIC,
             'transforms': 'hoistToStruct,insertTimestampField',
-            'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistToStruct$Value',
+            'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistField$Value',
             'transforms.hoistToStruct.field': 'content',
             'transforms.insertTimestampField.type': 'org.apache.kafka.connect.transforms.InsertField$Value',
             'transforms.insertTimestampField.timestamp.field': ts_fieldname,


Mime
View raw message