kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [22/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
Date Mon, 09 Nov 2015 06:11:41 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
new file mode 100644
index 0000000..5e47ad2
--- /dev/null
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -0,0 +1,735 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.connect.data.*;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.storage.Converter;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Implementation of Converter that uses JSON to store schemas and objects.
+ */
+public class JsonConverter implements Converter {
+    private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
+    private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
+    private static final String SCHEMAS_CACHE_CONFIG = "schemas.cache.size";
+    private static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
+
+    private static final HashMap<Schema.Type, JsonToConnectTypeConverter> TO_CONNECT_CONVERTERS = new HashMap<>();
+
+    private static Object checkOptionalAndDefault(Schema schema) {
+        if (schema.defaultValue() != null)
+            return schema.defaultValue();
+        if (schema.isOptional())
+            return null;
+        throw new DataException("Invalid null value for required field");
+    }
+
+    static {
+        TO_CONNECT_CONVERTERS.put(Schema.Type.BOOLEAN, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return value.booleanValue();
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.INT8, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return (byte) value.intValue();
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.INT16, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return (short) value.intValue();
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.INT32, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return value.intValue();
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.INT64, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return value.longValue();
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.FLOAT32, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return value.floatValue();
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.FLOAT64, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return value.doubleValue();
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.BYTES, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                try {
+                    if (value.isNull()) return checkOptionalAndDefault(schema);
+                    return value.binaryValue();
+                } catch (IOException e) {
+                    throw new DataException("Invalid bytes field", e);
+                }
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.STRING, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return value.textValue();
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.ARRAY, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+
+                Schema elemSchema = schema == null ? null : schema.valueSchema();
+                ArrayList<Object> result = new ArrayList<>();
+                for (JsonNode elem : value) {
+                    result.add(convertToConnect(elemSchema, elem));
+                }
+                return result;
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.MAP, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+
+                Schema keySchema = schema == null ? null : schema.keySchema();
+                Schema valueSchema = schema == null ? null : schema.valueSchema();
+
+                // If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other
+                // primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a
+                // schema, we default to encoding in a Map.
+                Map<Object, Object> result = new HashMap<>();
+                if (schema == null || keySchema.type() == Schema.Type.STRING) {
+                    if (!value.isObject())
+                        throw new DataException("Map's with string fields should be encoded as JSON objects, but found " + value.getNodeType());
+                    Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
+                    while (fieldIt.hasNext()) {
+                        Map.Entry<String, JsonNode> entry = fieldIt.next();
+                        result.put(entry.getKey(), convertToConnect(valueSchema, entry.getValue()));
+                    }
+                } else {
+                    if (!value.isArray())
+                        throw new DataException("Map's with non-string fields should be encoded as JSON array of tuples, but found " + value.getNodeType());
+                    for (JsonNode entry : value) {
+                        if (!entry.isArray())
+                            throw new DataException("Found invalid map entry instead of array tuple: " + entry.getNodeType());
+                        if (entry.size() != 2)
+                            throw new DataException("Found invalid map entry, expected length 2 but found :" + entry.size());
+                        result.put(convertToConnect(keySchema, entry.get(0)),
+                                convertToConnect(valueSchema, entry.get(1)));
+                    }
+                }
+                return result;
+            }
+        });
+        TO_CONNECT_CONVERTERS.put(Schema.Type.STRUCT, new JsonToConnectTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+
+                if (!value.isObject())
+                    throw new DataException("Structs should be encoded as JSON objects, but found " + value.getNodeType());
+
+                // We only have ISchema here but need Schema, so we need to materialize the actual schema. Using ISchema
+                // avoids having to materialize the schema for non-Struct types but it cannot be avoided for Structs since
+                // they require a schema to be provided at construction. However, the schema is only a SchemaBuilder during
+                // translation of schemas to JSON; during the more common translation of data to JSON, the call to schema.schema()
+                // just returns the schema Object and has no overhead.
+                Struct result = new Struct(schema.schema());
+                for (Field field : schema.fields())
+                    result.put(field, convertToConnect(field.schema(), value.get(field.name())));
+
+                return result;
+            }
+        });
+    }
+
+    // Convert values in Kafka Connect form into their logical types. These logical converters are discovered by logical type
+    // names specified in the field
+    private static final HashMap<String, LogicalTypeConverter> TO_CONNECT_LOGICAL_CONVERTERS = new HashMap<>();
+    static {
+        TO_CONNECT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof byte[]))
+                    throw new DataException("Invalid type for Decimal, underlying representation should be bytes but was " + value.getClass());
+                return Decimal.toLogical(schema, (byte[]) value);
+            }
+        });
+
+        TO_CONNECT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof Integer))
+                    throw new DataException("Invalid type for Date, underlying representation should be int32 but was " + value.getClass());
+                return Date.toLogical(schema, (int) value);
+            }
+        });
+
+        TO_CONNECT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof Integer))
+                    throw new DataException("Invalid type for Time, underlying representation should be int32 but was " + value.getClass());
+                return Time.toLogical(schema, (int) value);
+            }
+        });
+
+        TO_CONNECT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof Long))
+                    throw new DataException("Invalid type for Timestamp, underlying representation should be int64 but was " + value.getClass());
+                return Timestamp.toLogical(schema, (long) value);
+            }
+        });
+    }
+
+    private static final HashMap<String, LogicalTypeConverter> TO_JSON_LOGICAL_CONVERTERS = new HashMap<>();
+    static {
+        TO_JSON_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof BigDecimal))
+                    throw new DataException("Invalid type for Decimal, expected BigDecimal but was " + value.getClass());
+                return Decimal.fromLogical(schema, (BigDecimal) value);
+            }
+        });
+
+        TO_JSON_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof java.util.Date))
+                    throw new DataException("Invalid type for Date, expected Date but was " + value.getClass());
+                return Date.fromLogical(schema, (java.util.Date) value);
+            }
+        });
+
+        TO_JSON_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof java.util.Date))
+                    throw new DataException("Invalid type for Time, expected Date but was " + value.getClass());
+                return Time.fromLogical(schema, (java.util.Date) value);
+            }
+        });
+
+        TO_JSON_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof java.util.Date))
+                    throw new DataException("Invalid type for Timestamp, expected Date but was " + value.getClass());
+                return Timestamp.fromLogical(schema, (java.util.Date) value);
+            }
+        });
+    }
+
+
+    private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
+    private int cacheSize = SCHEMAS_CACHE_SIZE_DEFAULT;
+    private Cache<Schema, ObjectNode> fromConnectSchemaCache;
+    private Cache<JsonNode, Schema> toConnectSchemaCache;
+
+    private final JsonSerializer serializer = new JsonSerializer();
+    private final JsonDeserializer deserializer = new JsonDeserializer();
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG);
+        if (enableConfigsVal != null)
+            enableSchemas = enableConfigsVal.toString().equals("true");
+
+        serializer.configure(configs, isKey);
+        deserializer.configure(configs, isKey);
+
+        Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_DEFAULT);
+        if (cacheSizeVal != null)
+            cacheSize = (int) cacheSizeVal;
+        fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize));
+        toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize));
+    }
+
+    @Override
+    public byte[] fromConnectData(String topic, Schema schema, Object value) {
+        JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
+        try {
+            return serializer.serialize(topic, jsonValue);
+        } catch (SerializationException e) {
+            throw new DataException("Converting Kafka Connect data to byte[] failed due to serialization error: ", e);
+        }
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(String topic, byte[] value) {
+        JsonNode jsonValue;
+        try {
+            jsonValue = deserializer.deserialize(topic, value);
+        } catch (SerializationException e) {
+            throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
+        }
+
+        if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
+            throw new DataException("JsonDeserializer with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields");
+
+        // The deserialized data should either be an envelope object containing the schema and the payload or the schema
+        // was stripped during serialization and we need to fill in an all-encompassing schema.
+        if (!enableSchemas) {
+            ObjectNode envelope = JsonNodeFactory.instance.objectNode();
+            envelope.set("schema", null);
+            envelope.set("payload", jsonValue);
+            jsonValue = envelope;
+        }
+
+        return jsonToConnect(jsonValue);
+    }
+
+    private SchemaAndValue jsonToConnect(JsonNode jsonValue) {
+        if (jsonValue == null)
+            return SchemaAndValue.NULL;
+
+        if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+            throw new DataException("JSON value converted to Kafka Connect must be in envelope containing schema");
+
+        Schema schema = asConnectSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        return new SchemaAndValue(schema, convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
+    }
+
+    private ObjectNode asJsonSchema(Schema schema) {
+        if (schema == null)
+            return null;
+
+        ObjectNode cached = fromConnectSchemaCache.get(schema);
+        if (cached != null)
+            return cached;
+
+        final ObjectNode jsonSchema;
+        switch (schema.type()) {
+            case BOOLEAN:
+                jsonSchema = JsonSchema.BOOLEAN_SCHEMA.deepCopy();
+                break;
+            case BYTES:
+                jsonSchema = JsonSchema.BYTES_SCHEMA.deepCopy();
+                break;
+            case FLOAT64:
+                jsonSchema = JsonSchema.DOUBLE_SCHEMA.deepCopy();
+                break;
+            case FLOAT32:
+                jsonSchema = JsonSchema.FLOAT_SCHEMA.deepCopy();
+                break;
+            case INT8:
+                jsonSchema = JsonSchema.INT8_SCHEMA.deepCopy();
+                break;
+            case INT16:
+                jsonSchema = JsonSchema.INT16_SCHEMA.deepCopy();
+                break;
+            case INT32:
+                jsonSchema = JsonSchema.INT32_SCHEMA.deepCopy();
+                break;
+            case INT64:
+                jsonSchema = JsonSchema.INT64_SCHEMA.deepCopy();
+                break;
+            case STRING:
+                jsonSchema = JsonSchema.STRING_SCHEMA.deepCopy();
+                break;
+            case ARRAY:
+                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
+                jsonSchema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.valueSchema()));
+                break;
+            case MAP:
+                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME);
+                jsonSchema.set(JsonSchema.MAP_KEY_FIELD_NAME, asJsonSchema(schema.keySchema()));
+                jsonSchema.set(JsonSchema.MAP_VALUE_FIELD_NAME, asJsonSchema(schema.valueSchema()));
+                break;
+            case STRUCT:
+                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME);
+                ArrayNode fields = JsonNodeFactory.instance.arrayNode();
+                for (Field field : schema.fields()) {
+                    ObjectNode fieldJsonSchema = asJsonSchema(field.schema());
+                    fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name());
+                    fields.add(fieldJsonSchema);
+                }
+                jsonSchema.set(JsonSchema.STRUCT_FIELDS_FIELD_NAME, fields);
+                break;
+            default:
+                throw new DataException("Couldn't translate unsupported schema type " + schema + ".");
+        }
+
+        jsonSchema.put(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME, schema.isOptional());
+        if (schema.name() != null)
+            jsonSchema.put(JsonSchema.SCHEMA_NAME_FIELD_NAME, schema.name());
+        if (schema.version() != null)
+            jsonSchema.put(JsonSchema.SCHEMA_VERSION_FIELD_NAME, schema.version());
+        if (schema.doc() != null)
+            jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc());
+        if (schema.parameters() != null) {
+            ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode();
+            for (Map.Entry<String, String> prop : schema.parameters().entrySet())
+                jsonSchemaParams.put(prop.getKey(), prop.getValue());
+            jsonSchema.put(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
+        }
+        if (schema.defaultValue() != null)
+            jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue()));
+
+        fromConnectSchemaCache.put(schema, jsonSchema);
+        return jsonSchema;
+    }
+
+
+    private Schema asConnectSchema(JsonNode jsonSchema) {
+        if (jsonSchema.isNull())
+            return null;
+
+        Schema cached = toConnectSchemaCache.get(jsonSchema);
+        if (cached != null)
+            return cached;
+
+        JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
+        if (schemaTypeNode == null || !schemaTypeNode.isTextual())
+            throw new DataException("Schema must contain 'type' field");
+
+        final SchemaBuilder builder;
+        switch (schemaTypeNode.textValue()) {
+            case JsonSchema.BOOLEAN_TYPE_NAME:
+                builder = SchemaBuilder.bool();
+                break;
+            case JsonSchema.INT8_TYPE_NAME:
+                builder = SchemaBuilder.int8();
+                break;
+            case JsonSchema.INT16_TYPE_NAME:
+                builder = SchemaBuilder.int16();
+                break;
+            case JsonSchema.INT32_TYPE_NAME:
+                builder = SchemaBuilder.int32();
+                break;
+            case JsonSchema.INT64_TYPE_NAME:
+                builder = SchemaBuilder.int64();
+                break;
+            case JsonSchema.FLOAT_TYPE_NAME:
+                builder = SchemaBuilder.float32();
+                break;
+            case JsonSchema.DOUBLE_TYPE_NAME:
+                builder = SchemaBuilder.float64();
+                break;
+            case JsonSchema.BYTES_TYPE_NAME:
+                builder = SchemaBuilder.bytes();
+                break;
+            case JsonSchema.STRING_TYPE_NAME:
+                builder = SchemaBuilder.string();
+                break;
+            case JsonSchema.ARRAY_TYPE_NAME:
+                JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
+                if (elemSchema == null)
+                    throw new DataException("Array schema did not specify the element type");
+                builder = SchemaBuilder.array(asConnectSchema(elemSchema));
+                break;
+            case JsonSchema.MAP_TYPE_NAME:
+                JsonNode keySchema = jsonSchema.get(JsonSchema.MAP_KEY_FIELD_NAME);
+                if (keySchema == null)
+                    throw new DataException("Map schema did not specify the key type");
+                JsonNode valueSchema = jsonSchema.get(JsonSchema.MAP_VALUE_FIELD_NAME);
+                if (valueSchema == null)
+                    throw new DataException("Map schema did not specify the value type");
+                builder = SchemaBuilder.map(asConnectSchema(keySchema), asConnectSchema(valueSchema));
+                break;
+            case JsonSchema.STRUCT_TYPE_NAME:
+                builder = SchemaBuilder.struct();
+                JsonNode fields = jsonSchema.get(JsonSchema.STRUCT_FIELDS_FIELD_NAME);
+                if (fields == null || !fields.isArray())
+                    throw new DataException("Struct schema's \"fields\" argument is not an array.");
+                for (JsonNode field : fields) {
+                    JsonNode jsonFieldName = field.get(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME);
+                    if (jsonFieldName == null || !jsonFieldName.isTextual())
+                        throw new DataException("Struct schema's field name not specified properly");
+                    builder.field(jsonFieldName.asText(), asConnectSchema(field));
+                }
+                break;
+            default:
+                throw new DataException("Unknown schema type: " + schemaTypeNode.textValue());
+        }
+
+
+        JsonNode schemaOptionalNode = jsonSchema.get(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME);
+        if (schemaOptionalNode != null && schemaOptionalNode.isBoolean() && schemaOptionalNode.booleanValue())
+            builder.optional();
+        else
+            builder.required();
+
+        JsonNode schemaNameNode = jsonSchema.get(JsonSchema.SCHEMA_NAME_FIELD_NAME);
+        if (schemaNameNode != null && schemaNameNode.isTextual())
+            builder.name(schemaNameNode.textValue());
+
+        JsonNode schemaVersionNode = jsonSchema.get(JsonSchema.SCHEMA_VERSION_FIELD_NAME);
+        if (schemaVersionNode != null && schemaVersionNode.isIntegralNumber()) {
+            builder.version(schemaVersionNode.intValue());
+        }
+
+        JsonNode schemaDocNode = jsonSchema.get(JsonSchema.SCHEMA_DOC_FIELD_NAME);
+        if (schemaDocNode != null && schemaDocNode.isTextual())
+            builder.doc(schemaDocNode.textValue());
+
+        JsonNode schemaParamsNode = jsonSchema.get(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME);
+        if (schemaParamsNode != null && schemaParamsNode.isObject()) {
+            Iterator<Map.Entry<String, JsonNode>> paramsIt = schemaParamsNode.fields();
+            while (paramsIt.hasNext()) {
+                Map.Entry<String, JsonNode> entry = paramsIt.next();
+                JsonNode paramValue = entry.getValue();
+                if (!paramValue.isTextual())
+                    throw new DataException("Schema parameters must have string values.");
+                builder.parameter(entry.getKey(), paramValue.textValue());
+            }
+        }
+
+        JsonNode schemaDefaultNode = jsonSchema.get(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME);
+        if (schemaDefaultNode != null)
+            builder.defaultValue(convertToConnect(builder, schemaDefaultNode));
+
+        Schema result = builder.build();
+        toConnectSchemaCache.put(jsonSchema, result);
+        return result;
+    }
+
+
+    /**
+     * Convert this object, in org.apache.kafka.connect.data format, into a JSON object with an envelope object
+     * containing schema and payload fields.
+     * @param schema the schema for the data
+     * @param value the value
+     * @return JsonNode-encoded version
+     */
+    private JsonNode convertToJsonWithEnvelope(Schema schema, Object value) {
+        return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode();
+    }
+
+    private JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) {
+        return convertToJson(schema, value);
+    }
+
+    /**
+     * Convert this object, in the org.apache.kafka.connect.data format, into a JSON object, returning both the schema
+     * and the converted object.
+     */
+    private static JsonNode convertToJson(Schema schema, Object logicalValue) {
+        if (logicalValue == null) {
+            if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
+                return null;
+            if (schema.defaultValue() != null)
+                return convertToJson(schema, schema.defaultValue());
+            if (schema.isOptional())
+                return JsonNodeFactory.instance.nullNode();
+            throw new DataException("Conversion error: null value for field that is required and has no default value");
+        }
+
+        Object value = logicalValue;
+        if (schema != null && schema.name() != null) {
+            LogicalTypeConverter logicalConverter = TO_JSON_LOGICAL_CONVERTERS.get(schema.name());
+            if (logicalConverter != null)
+                value = logicalConverter.convert(schema, logicalValue);
+        }
+
+        try {
+            final Schema.Type schemaType;
+            if (schema == null) {
+                schemaType = ConnectSchema.schemaType(value.getClass());
+                if (schemaType == null)
+                    throw new DataException("Java class " + value.getClass() + " does not have corresponding schema type.");
+            } else {
+                schemaType = schema.type();
+            }
+            switch (schemaType) {
+                case INT8:
+                    return JsonNodeFactory.instance.numberNode((Byte) value);
+                case INT16:
+                    return JsonNodeFactory.instance.numberNode((Short) value);
+                case INT32:
+                    return JsonNodeFactory.instance.numberNode((Integer) value);
+                case INT64:
+                    return JsonNodeFactory.instance.numberNode((Long) value);
+                case FLOAT32:
+                    return JsonNodeFactory.instance.numberNode((Float) value);
+                case FLOAT64:
+                    return JsonNodeFactory.instance.numberNode((Double) value);
+                case BOOLEAN:
+                    return JsonNodeFactory.instance.booleanNode((Boolean) value);
+                case STRING:
+                    CharSequence charSeq = (CharSequence) value;
+                    return JsonNodeFactory.instance.textNode(charSeq.toString());
+                case BYTES:
+                    if (value instanceof byte[])
+                        return JsonNodeFactory.instance.binaryNode((byte[]) value);
+                    else if (value instanceof ByteBuffer)
+                        return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
+                    else
+                        throw new DataException("Invalid type for bytes type: " + value.getClass());
+                case ARRAY: {
+                    Collection collection = (Collection) value;
+                    ArrayNode list = JsonNodeFactory.instance.arrayNode();
+                    for (Object elem : collection) {
+                        Schema valueSchema = schema == null ? null : schema.valueSchema();
+                        JsonNode fieldValue = convertToJson(valueSchema, elem);
+                        list.add(fieldValue);
+                    }
+                    return list;
+                }
+                case MAP: {
+                    Map<?, ?> map = (Map<?, ?>) value;
+                    // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
+                    boolean objectMode;
+                    if (schema == null) {
+                        objectMode = true;
+                        for (Map.Entry<?, ?> entry : map.entrySet()) {
+                            if (!(entry.getKey() instanceof String)) {
+                                objectMode = false;
+                                break;
+                            }
+                        }
+                    } else {
+                        objectMode = schema.keySchema().type() == Schema.Type.STRING;
+                    }
+                    ObjectNode obj = null;
+                    ArrayNode list = null;
+                    if (objectMode)
+                        obj = JsonNodeFactory.instance.objectNode();
+                    else
+                        list = JsonNodeFactory.instance.arrayNode();
+                    for (Map.Entry<?, ?> entry : map.entrySet()) {
+                        Schema keySchema = schema == null ? null : schema.keySchema();
+                        Schema valueSchema = schema == null ? null : schema.valueSchema();
+                        JsonNode mapKey = convertToJson(keySchema, entry.getKey());
+                        JsonNode mapValue = convertToJson(valueSchema, entry.getValue());
+
+                        if (objectMode)
+                            obj.set(mapKey.asText(), mapValue);
+                        else
+                            list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
+                    }
+                    return objectMode ? obj : list;
+                }
+                case STRUCT: {
+                    Struct struct = (Struct) value;
+                    if (struct.schema() != schema)
+                        throw new DataException("Mismatching schema.");
+                    ObjectNode obj = JsonNodeFactory.instance.objectNode();
+                    for (Field field : schema.fields()) {
+                        obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
+                    }
+                    return obj;
+                }
+            }
+
+            throw new DataException("Couldn't convert " + value + " to JSON.");
+        } catch (ClassCastException e) {
+            throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass());
+        }
+    }
+
+
+    private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
+        JsonToConnectTypeConverter typeConverter;
+        final Schema.Type schemaType;
+        if (schema != null) {
+            schemaType = schema.type();
+        } else {
+            switch (jsonValue.getNodeType()) {
+                case NULL:
+                    // Special case. With no schema
+                    return null;
+                case BOOLEAN:
+                    schemaType = Schema.Type.BOOLEAN;
+                    break;
+                case NUMBER:
+                    if (jsonValue.isIntegralNumber())
+                        schemaType = Schema.Type.INT64;
+                    else
+                        schemaType = Schema.Type.FLOAT64;
+                    break;
+                case ARRAY:
+                    schemaType = Schema.Type.ARRAY;
+                    break;
+                case OBJECT:
+                    schemaType = Schema.Type.MAP;
+                    break;
+                case STRING:
+                    schemaType = Schema.Type.STRING;
+                    break;
+
+                case BINARY:
+                case MISSING:
+                case POJO:
+                default:
+                    schemaType = null;
+                    break;
+            }
+        }
+        typeConverter = TO_CONNECT_CONVERTERS.get(schemaType);
+        if (typeConverter == null)
+            throw new DataException("Unknown schema type: " + schema.type());
+
+        Object converted = typeConverter.convert(schema, jsonValue);
+        if (schema != null && schema.name() != null) {
+            LogicalTypeConverter logicalConverter = TO_CONNECT_LOGICAL_CONVERTERS.get(schema.name());
+            if (logicalConverter != null)
+                converted = logicalConverter.convert(schema, converted);
+        }
+        return converted;
+    }
+
+
+    private interface JsonToConnectTypeConverter {
+        Object convert(Schema schema, JsonNode value);
+    }
+
+    private interface LogicalTypeConverter {
+        Object convert(Schema schema, Object value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
new file mode 100644
index 0000000..a918eb5
--- /dev/null
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+
+/**
+ * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
+ * structured data without having associated Java classes. This deserializer also supports Connect schemas.
+ */
+public class JsonDeserializer implements Deserializer<JsonNode> {
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Default constructor needed by Kafka
+     */
+    public JsonDeserializer() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> props, boolean isKey) {
+    }
+
+    @Override
+    public JsonNode deserialize(String topic, byte[] bytes) {
+        if (bytes == null)
+            return null;
+
+        JsonNode data;
+        try {
+            data = objectMapper.readTree(bytes);
+        } catch (Exception e) {
+            throw new SerializationException(e);
+        }
+
+        return data;
+    }
+
+    @Override
+    public void close() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSchema.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSchema.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSchema.java
new file mode 100644
index 0000000..9005fda
--- /dev/null
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSchema.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class JsonSchema {
+
+    static final String ENVELOPE_SCHEMA_FIELD_NAME = "schema";
+    static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload";
+    static final String SCHEMA_TYPE_FIELD_NAME = "type";
+    static final String SCHEMA_OPTIONAL_FIELD_NAME = "optional";
+    static final String SCHEMA_NAME_FIELD_NAME = "name";
+    static final String SCHEMA_VERSION_FIELD_NAME = "version";
+    static final String SCHEMA_DOC_FIELD_NAME = "doc";
+    static final String SCHEMA_PARAMETERS_FIELD_NAME = "parameters";
+    static final String SCHEMA_DEFAULT_FIELD_NAME = "default";
+    static final String ARRAY_ITEMS_FIELD_NAME = "items";
+    static final String MAP_KEY_FIELD_NAME = "keys";
+    static final String MAP_VALUE_FIELD_NAME = "values";
+    static final String STRUCT_FIELDS_FIELD_NAME = "fields";
+    static final String STRUCT_FIELD_NAME_FIELD_NAME = "field";
+    static final String BOOLEAN_TYPE_NAME = "boolean";
+    static final ObjectNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME);
+    static final String INT8_TYPE_NAME = "int8";
+    static final ObjectNode INT8_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT8_TYPE_NAME);
+    static final String INT16_TYPE_NAME = "int16";
+    static final ObjectNode INT16_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT16_TYPE_NAME);
+    static final String INT32_TYPE_NAME = "int32";
+    static final ObjectNode INT32_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT32_TYPE_NAME);
+    static final String INT64_TYPE_NAME = "int64";
+    static final ObjectNode INT64_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT64_TYPE_NAME);
+    static final String FLOAT_TYPE_NAME = "float";
+    static final ObjectNode FLOAT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, FLOAT_TYPE_NAME);
+    static final String DOUBLE_TYPE_NAME = "double";
+    static final ObjectNode DOUBLE_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, DOUBLE_TYPE_NAME);
+    static final String BYTES_TYPE_NAME = "bytes";
+    static final ObjectNode BYTES_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BYTES_TYPE_NAME);
+    static final String STRING_TYPE_NAME = "string";
+    static final ObjectNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME);
+    static final String ARRAY_TYPE_NAME = "array";
+    static final String MAP_TYPE_NAME = "map";
+    static final String STRUCT_TYPE_NAME = "struct";
+
+    public static ObjectNode envelope(JsonNode schema, JsonNode payload) {
+        ObjectNode result = JsonNodeFactory.instance.objectNode();
+        result.set(ENVELOPE_SCHEMA_FIELD_NAME, schema);
+        result.set(ENVELOPE_PAYLOAD_FIELD_NAME, payload);
+        return result;
+    }
+
+    static class Envelope {
+        public JsonNode schema;
+        public JsonNode payload;
+
+        public Envelope(JsonNode schema, JsonNode payload) {
+            this.schema = schema;
+            this.payload = payload;
+        }
+
+        public ObjectNode toJsonNode() {
+            return envelope(schema, payload);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
new file mode 100644
index 0000000..77bcfca
--- /dev/null
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+
+/**
+ * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily
+ * structured data without corresponding Java classes. This serializer also supports Connect schemas.
+ */
+public class JsonSerializer implements Serializer<JsonNode> {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Default constructor needed by Kafka
+     */
+    public JsonSerializer() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> config, boolean isKey) {
+    }
+
+    @Override
+    public byte[] serialize(String topic, JsonNode data) {
+        if (data == null)
+            return null;
+
+        try {
+            return objectMapper.writeValueAsBytes(data);
+        } catch (Exception e) {
+            throw new SerializationException("Error serializing JSON message", e);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
new file mode 100644
index 0000000..e56b009
--- /dev/null
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JsonConverterTest {
+    private static final String TOPIC = "topic";
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    JsonConverter converter = new JsonConverter();
+
+    @Before
+    public void setUp() {
+        converter.configure(Collections.EMPTY_MAP, false);
+    }
+
+    // Schema metadata
+
+    @Test
+    public void testConnectSchemaMetadataTranslation() {
+        // this validates the non-type fields are translated and handled properly
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
+        assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes()));
+        assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
+                converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes()));
+        assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").parameter("foo", "bar").build(), true),
+                converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\", \"parameters\": { \"foo\": \"bar\" }}, \"payload\": true }".getBytes()));
+    }
+
+    // Schema types
+
+    @Test
+    public void booleanToConnect() {
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }".getBytes()));
+    }
+
+    @Test
+    public void byteToConnect() {
+        assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }".getBytes()));
+    }
+
+    @Test
+    public void shortToConnect() {
+        assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }".getBytes()));
+    }
+
+    @Test
+    public void intToConnect() {
+        assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }".getBytes()));
+    }
+
+    @Test
+    public void longToConnect() {
+        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }".getBytes()));
+        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }".getBytes()));
+    }
+
+    @Test
+    public void floatToConnect() {
+        assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }".getBytes()));
+    }
+
+    @Test
+    public void doubleToConnect() {
+        assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }".getBytes()));
+    }
+
+
+    @Test
+    public void bytesToConnect() throws UnsupportedEncodingException {
+        ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
+        String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value());
+        assertEquals(reference, converted);
+    }
+
+    @Test
+    public void stringToConnect() {
+        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
+    }
+
+    @Test
+    public void arrayToConnect() {
+        byte[] arrayJson = "{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }".getBytes();
+        assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toConnectData(TOPIC, arrayJson));
+    }
+
+    @Test
+    public void mapToConnectStringKeys() {
+        byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }".getBytes();
+        Map<String, Integer> expected = new HashMap<>();
+        expected.put("key1", 12);
+        expected.put("key2", 15);
+        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toConnectData(TOPIC, mapJson));
+    }
+
+    @Test
+    public void mapToConnectNonStringKeys() {
+        byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }".getBytes();
+        Map<Integer, Integer> expected = new HashMap<>();
+        expected.put(1, 12);
+        expected.put(2, 15);
+        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toConnectData(TOPIC, mapJson));
+    }
+
+    @Test
+    public void structToConnect() {
+        byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }".getBytes();
+        Schema expectedSchema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
+        Struct expected = new Struct(expectedSchema).put("field1", true).put("field2", "string");
+        SchemaAndValue converted = converter.toConnectData(TOPIC, structJson);
+        assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
+    }
+
+    @Test(expected = DataException.class)
+    public void nullToConnect() {
+        // When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope
+        assertEquals(SchemaAndValue.NULL, converter.toConnectData(TOPIC, null));
+    }
+
+    @Test
+    public void nullSchemaPrimitiveToConnect() {
+        SchemaAndValue converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());
+        assertEquals(SchemaAndValue.NULL, converted);
+
+        converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": true }".getBytes());
+        assertEquals(new SchemaAndValue(null, true), converted);
+
+        // Integers: Connect has more data types, and JSON unfortunately mixes all number types. We try to preserve
+        // info as best we can, so we always use the largest integer and floating point numbers we can and have Jackson
+        // determine if it's an integer or not
+        converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": 12 }".getBytes());
+        assertEquals(new SchemaAndValue(null, 12L), converted);
+
+        converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": 12.24 }".getBytes());
+        assertEquals(new SchemaAndValue(null, 12.24), converted);
+
+        converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": \"a string\" }".getBytes());
+        assertEquals(new SchemaAndValue(null, "a string"), converted);
+
+        converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": [1, \"2\", 3] }".getBytes());
+        assertEquals(new SchemaAndValue(null, Arrays.asList(1L, "2", 3L)), converted);
+
+        converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": { \"field1\": 1, \"field2\": 2} }".getBytes());
+        Map<String, Long> obj = new HashMap<>();
+        obj.put("field1", 1L);
+        obj.put("field2", 2L);
+        assertEquals(new SchemaAndValue(null, obj), converted);
+    }
+
+    @Test
+    public void decimalToConnect() {
+        Schema schema = Decimal.schema(2);
+        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        // Payload is base64 encoded byte[]{0, -100}, which is the two's complement encoding of 156.
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"2\" } }, \"payload\": \"AJw=\" }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        BigDecimal converted = (BigDecimal) schemaAndValue.value();
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, converted);
+    }
+
+    @Test
+    public void dateToConnect() {
+        Schema schema = Date.SCHEMA;
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.DATE, 10000);
+        java.util.Date reference = calendar.getTime();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Date\", \"version\": 1 }, \"payload\": 10000 }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        java.util.Date converted = (java.util.Date) schemaAndValue.value();
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, converted);
+    }
+
+    @Test
+    public void timeToConnect() {
+        Schema schema = Time.SCHEMA;
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.MILLISECOND, 14400000);
+        java.util.Date reference = calendar.getTime();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Time\", \"version\": 1 }, \"payload\": 14400000 }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        java.util.Date converted = (java.util.Date) schemaAndValue.value();
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, converted);
+    }
+
+    @Test
+    public void timestampToConnect() {
+        Schema schema = Timestamp.SCHEMA;
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.MILLISECOND, 2000000000);
+        calendar.add(Calendar.MILLISECOND, 2000000000);
+        java.util.Date reference = calendar.getTime();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1 }, \"payload\": 4000000000 }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        java.util.Date converted = (java.util.Date) schemaAndValue.value();
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, converted);
+    }
+
+    // Schema metadata
+
+    @Test
+    public void testJsonSchemaMetadataTranslation() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+
+        converted = parse(converter.fromConnectData(TOPIC, Schema.OPTIONAL_BOOLEAN_SCHEMA, null));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNull());
+
+        converted = parse(converter.fromConnectData(TOPIC, SchemaBuilder.bool().defaultValue(true).build(), true));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+
+        converted = parse(converter.fromConnectData(TOPIC, SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").parameter("foo", "bar").build(), true));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\", \"parameters\": { \"foo\": \"bar\" }}"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+    }
+
+
+    @Test
+    public void testCacheSchemaToConnectConversion() {
+        Cache<JsonNode, Schema> cache = Whitebox.getInternalState(converter, "toConnectSchemaCache");
+        assertEquals(0, cache.size());
+
+        converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes());
+        assertEquals(1, cache.size());
+
+        converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes());
+        assertEquals(1, cache.size());
+
+        // Different schema should also get cached
+        converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": true }".getBytes());
+        assertEquals(2, cache.size());
+
+        // Even equivalent, but different JSON encoding of schema, should get different cache entry
+        converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false }, \"payload\": true }".getBytes());
+        assertEquals(3, cache.size());
+    }
+
+    // Schema types
+
+    @Test
+    public void booleanToJson() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+    }
+
+    @Test
+    public void byteToJson() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.INT8_SCHEMA, (byte) 12));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int8\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
+    }
+
+    @Test
+    public void shortToJson() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.INT16_SCHEMA, (short) 12));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int16\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
+    }
+
+    @Test
+    public void intToJson() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.INT32_SCHEMA, 12));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
+    }
+
+    @Test
+    public void longToJson() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.INT64_SCHEMA, 4398046511104L));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int64\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue());
+    }
+
+    @Test
+    public void floatToJson() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.FLOAT32_SCHEMA, 12.34f));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001);
+    }
+
+    @Test
+    public void doubleToJson() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.FLOAT64_SCHEMA, 12.34));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001);
+    }
+
+    @Test
+    public void bytesToJson() throws IOException {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, "test-string".getBytes()));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(ByteBuffer.wrap("test-string".getBytes()),
+                ByteBuffer.wrap(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue()));
+    }
+
+    @Test
+    public void stringToJson() {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Schema.STRING_SCHEMA, "test-string"));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
+    }
+
+    @Test
+    public void arrayToJson() {
+        Schema int32Array = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, int32Array, Arrays.asList(1, 2, 3)));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int32\", \"optional\": false }, \"optional\": false }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add(2).add(3),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    @Test
+    public void mapToJsonStringKeys() {
+        Schema stringIntMap = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build();
+        Map<String, Integer> input = new HashMap<>();
+        input.put("key1", 12);
+        input.put("key2", 15);
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, stringIntMap, input));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", 15),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    @Test
+    public void mapToJsonNonStringKeys() {
+        Schema intIntMap = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build();
+        Map<Integer, Integer> input = new HashMap<>();
+        input.put(1, 12);
+        input.put(2, 15);
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, intIntMap, input));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"int32\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+
+        assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
+        ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertEquals(2, payload.size());
+        Set<JsonNode> payloadEntries = new HashSet<>();
+        for (JsonNode elem : payload)
+            payloadEntries.add(elem);
+        assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add(1).add(12),
+                        JsonNodeFactory.instance.arrayNode().add(2).add(15))),
+                payloadEntries
+        );
+    }
+
+    @Test
+    public void structToJson() {
+        Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
+        Struct input = new Struct(schema).put("field1", true).put("field2", "string");
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, schema, input));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(JsonNodeFactory.instance.objectNode()
+                        .put("field1", true)
+                        .put("field2", "string"),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+
+    @Test
+    public void decimalToJson() throws IOException {
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2)));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"2\" } }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertArrayEquals(new byte[]{0, -100}, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue());
+    }
+
+    @Test
+    public void dateToJson() throws IOException {
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.DATE, 10000);
+        java.util.Date date = calendar.getTime();
+
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Date.SCHEMA, date));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Date\", \"version\": 1 }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertTrue(payload.isInt());
+        assertEquals(10000, payload.intValue());
+    }
+
+    @Test
+    public void timeToJson() throws IOException {
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.MILLISECOND, 14400000);
+        java.util.Date date = calendar.getTime();
+
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Time.SCHEMA, date));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Time\", \"version\": 1 }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertTrue(payload.isInt());
+        assertEquals(14400000, payload.longValue());
+    }
+
+    @Test
+    public void timestampToJson() throws IOException {
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.MILLISECOND, 2000000000);
+        calendar.add(Calendar.MILLISECOND, 2000000000);
+        java.util.Date date = calendar.getTime();
+
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Timestamp.SCHEMA, date));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1 }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertTrue(payload.isLong());
+        assertEquals(4000000000L, payload.longValue());
+    }
+
+
+    @Test
+    public void nullSchemaAndPrimitiveToJson() {
+        // This still needs to do conversion of data, null schema means "anything goes"
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, null, true));
+        validateEnvelopeNullSchema(converted);
+        assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+    }
+
+    @Test
+    public void nullSchemaAndArrayToJson() {
+        // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+        // types to verify conversion still works.
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, null, Arrays.asList(1, "string", true)));
+        validateEnvelopeNullSchema(converted);
+        assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add("string").add(true),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    @Test
+    public void nullSchemaAndMapToJson() {
+        // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+        // types to verify conversion still works.
+        Map<String, Object> input = new HashMap<>();
+        input.put("key1", 12);
+        input.put("key2", "string");
+        input.put("key3", true);
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, null, input));
+        validateEnvelopeNullSchema(converted);
+        assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", "string").put("key3", true),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    @Test
+    public void nullSchemaAndMapNonStringKeysToJson() {
+        // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+        // types to verify conversion still works.
+        Map<Object, Object> input = new HashMap<>();
+        input.put("string", 12);
+        input.put(52, "string");
+        input.put(false, true);
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, null, input));
+        validateEnvelopeNullSchema(converted);
+        assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
+        ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertEquals(3, payload.size());
+        Set<JsonNode> payloadEntries = new HashSet<>();
+        for (JsonNode elem : payload)
+            payloadEntries.add(elem);
+        assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add("string").add(12),
+                        JsonNodeFactory.instance.arrayNode().add(52).add("string"),
+                        JsonNodeFactory.instance.arrayNode().add(false).add(true))),
+                payloadEntries
+        );
+    }
+
+
+    @Test(expected = DataException.class)
+    public void mismatchSchemaJson() {
+        // If we have mismatching schema info, we should properly convert to a DataException
+        converter.fromConnectData(TOPIC, Schema.FLOAT64_SCHEMA, true);
+    }
+
+
+
+    @Test
+    public void noSchemaToConnect() {
+        Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
+        converter.configure(props, true);
+        assertEquals(new SchemaAndValue(null, true), converter.toConnectData(TOPIC, "true".getBytes()));
+    }
+
+    @Test
+    public void noSchemaToJson() {
+        Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
+        converter.configure(props, true);
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, null, true));
+        assertTrue(converted.isBoolean());
+        assertEquals(true, converted.booleanValue());
+    }
+
+    @Test
+    public void testCacheSchemaToJsonConversion() {
+        Cache<Schema, ObjectNode> cache = Whitebox.getInternalState(converter, "fromConnectSchemaCache");
+        assertEquals(0, cache.size());
+
+        // Repeated conversion of the same schema, even if the schema object is different should return the same Java
+        // object
+        converter.fromConnectData(TOPIC, SchemaBuilder.bool().build(), true);
+        assertEquals(1, cache.size());
+
+        converter.fromConnectData(TOPIC, SchemaBuilder.bool().build(), true);
+        assertEquals(1, cache.size());
+
+        // Validate that a similar, but different schema correctly returns a different schema.
+        converter.fromConnectData(TOPIC, SchemaBuilder.bool().optional().build(), true);
+        assertEquals(2, cache.size());
+    }
+
+
+    private JsonNode parse(byte[] json) {
+        try {
+            return objectMapper.readTree(json);
+        } catch (IOException e) {
+            fail("IOException during JSON parse: " + e.getMessage());
+            throw new RuntimeException("failed");
+        }
+    }
+
+    private JsonNode parse(String json) {
+        try {
+            return objectMapper.readTree(json);
+        } catch (IOException e) {
+            fail("IOException during JSON parse: " + e.getMessage());
+            throw new RuntimeException("failed");
+        }
+    }
+
+    private void validateEnvelope(JsonNode env) {
+        assertNotNull(env);
+        assertTrue(env.isObject());
+        assertEquals(2, env.size());
+        assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject());
+        assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    private void validateEnvelopeNullSchema(JsonNode env) {
+        assertNotNull(env);
+        assertTrue(env.isObject());
+        assertEquals(2, env.size());
+        assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
new file mode 100644
index 0000000..5ad032e
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * <p>
+ * Command line utility that runs Kafka Connect in distributed mode. In this mode, the process joints a group of other workers
+ * and work is distributed among them. This is useful for running Connect as a service, where connectors can be
+ * submitted to the cluster to be automatically executed in a scalable, distributed fashion. This also allows you to
+ * easily scale out horizontally, elastically adding or removing capacity simply by starting or stopping worker
+ * instances.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public class ConnectDistributed {
+    private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);
+
+    public static void main(String[] args) throws Exception {
+        if (args.length < 1) {
+            log.info("Usage: ConnectDistributed worker.properties");
+            System.exit(1);
+        }
+
+        String workerPropsFile = args[0];
+        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+
+        DistributedConfig config = new DistributedConfig(workerProps);
+        Worker worker = new Worker(config, new KafkaOffsetBackingStore());
+        RestServer rest = new RestServer(config);
+        DistributedHerder herder = new DistributedHerder(config, worker, rest.advertisedUrl());
+        final Connect connect = new Connect(worker, herder, rest);
+        connect.start();
+
+        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
+        connect.awaitStop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
new file mode 100644
index 0000000..f89a72a
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * <p>
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is
+ * useful for ad hoc, small, or experimental jobs.
+ * </p>
+ * <p>
+ * By default, no job configs or offset data is persistent. You can make jobs persistent and
+ * fault tolerant by overriding the settings to use file storage for both.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public class ConnectStandalone {
+    private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
+
+    public static void main(String[] args) throws Exception {
+
+        if (args.length < 2) {
+            log.info("Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...]");
+            System.exit(1);
+        }
+
+        String workerPropsFile = args[0];
+        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+
+        StandaloneConfig config = new StandaloneConfig(workerProps);
+        Worker worker = new Worker(config, new FileOffsetBackingStore());
+        RestServer rest = new RestServer(config);
+        Herder herder = new StandaloneHerder(worker);
+        final Connect connect = new Connect(worker, herder, rest);
+        connect.start();
+
+        try {
+            for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
+                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+                    @Override
+                    public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+                        if (error != null)
+                            log.error("Failed to create job for {}", connectorPropsFile);
+                        else
+                            log.info("Created connector {}", info.result().name());
+                    }
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            connect.stop();
+        }
+
+        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
+        connect.awaitStop();
+    }
+}


Mime
View raw message