kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2476: Add Decimal, Date, and Timestamp logical types.
Date Wed, 07 Oct 2015 04:58:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 174a43cd0 -> 02e103b75


KAFKA-2476: Add Decimal, Date, and Timestamp logical types.

To support Decimal, this also adds support for schema parameters, which is an
extra set of String key value pairs which provide extra information about the
schema. For Decimal, this is used to encode the scale parameter, which is part
of the schema instead of being passed with every value.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Guozhang Wang

Closes #281 from ewencp/kafka-2476-copycat-logical-types


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/02e103b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/02e103b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/02e103b7

Branch: refs/heads/trunk
Commit: 02e103b75aa6161c27358e83ab2828515c876701
Parents: 174a43c
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Oct 6 22:02:46 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Oct 6 22:02:46 2015 -0700

----------------------------------------------------------------------
 .../kafka/copycat/data/CopycatSchema.java       |  31 +++++-
 .../org/apache/kafka/copycat/data/Date.java     |  76 +++++++++++++
 .../org/apache/kafka/copycat/data/Decimal.java  |  87 +++++++++++++++
 .../org/apache/kafka/copycat/data/Schema.java   |   7 ++
 .../kafka/copycat/data/SchemaBuilder.java       |  44 +++++++-
 .../apache/kafka/copycat/data/Timestamp.java    |  64 +++++++++++
 .../kafka/copycat/data/CopycatSchemaTest.java   |  38 ++++---
 .../org/apache/kafka/copycat/data/DateTest.java |  78 +++++++++++++
 .../apache/kafka/copycat/data/DecimalTest.java  |  63 +++++++++++
 .../kafka/copycat/data/SchemaBuilderTest.java   |  40 +++++--
 .../kafka/copycat/data/TimestampTest.java       |  75 +++++++++++++
 .../kafka/copycat/json/JsonConverter.java       | 103 ++++++++++++++++-
 .../apache/kafka/copycat/json/JsonSchema.java   |   1 +
 .../kafka/copycat/json/JsonConverterTest.java   | 110 ++++++++++++++++++-
 14 files changed, 772 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
index 8c9de17..6b77717 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
@@ -75,17 +75,19 @@ public class CopycatSchema implements Schema {
     private final Integer version;
     // Optional human readable documentation describing this schema.
     private final String doc;
+    private final Map<String, String> parameters;
 
     /**
      * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
      */
-    public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc, List<Field> fields, Schema keySchema, Schema valueSchema) {
+    public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc, Map<String, String> parameters, List<Field> fields, Schema keySchema, Schema valueSchema) {
         this.type = type;
         this.optional = optional;
         this.defaultValue = defaultValue;
         this.name = name;
         this.version = version;
         this.doc = doc;
+        this.parameters = parameters;
 
         this.fields = fields;
         if (this.fields != null && this.type == Type.STRUCT) {
@@ -100,6 +102,21 @@ public class CopycatSchema implements Schema {
         this.valueSchema = valueSchema;
     }
 
+    /**
+     * Construct a Schema for a primitive type, setting schema parameters, struct fields, and key and value schemas to null.
+     */
+    public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc) {
+        this(type, optional, defaultValue, name, version, doc, null, null, null, null);
+    }
+
+    /**
+     * Construct a default schema for a primitive type. The schema is required, has no default value, name, version,
+     * or documentation.
+     */
+    public CopycatSchema(Type type) {
+        this(type, false, null, null, null, null);
+    }
+
     @Override
     public Type type() {
         return type;
@@ -130,7 +147,10 @@ public class CopycatSchema implements Schema {
         return doc;
     }
 
-
+    @Override
+    public Map<String, String> parameters() {
+        return parameters;
+    }
 
     @Override
     public List<Field> fields() {
@@ -163,7 +183,7 @@ public class CopycatSchema implements Schema {
 
     /**
      * Validate that the value can be used with the schema, i.e. that its type matches the schema type and nullability
-     * requirements. Throws a DataException if the value is invalid. Returns
+     * requirements. Throws a DataException if the value is invalid.
      * @param schema Schema to test
      * @param value value to test
      */
@@ -239,12 +259,13 @@ public class CopycatSchema implements Schema {
                 Objects.equals(valueSchema, schema.valueSchema) &&
                 Objects.equals(name, schema.name) &&
                 Objects.equals(version, schema.version) &&
-                Objects.equals(doc, schema.doc);
+                Objects.equals(doc, schema.doc) &&
+                Objects.equals(parameters, schema.parameters);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(type, optional, defaultValue, fields, keySchema, valueSchema, name, version, doc);
+        return Objects.hash(type, optional, defaultValue, fields, keySchema, valueSchema, name, version, doc, parameters);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
new file mode 100644
index 0000000..4e14659
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+
+/**
+ * <p>
+ *     A date representing a calendar day with no time of day or timezone. The corresponding Java type is a java.util.Date
+ *     with hours, minutes, seconds, milliseconds set to 0. The underlying representation is an integer representing the
+ *     number of standardized days (based on a number of milliseconds with 24 hours/day, 60 minutes/hour, 60 seconds/minute,
+ *     1000 milliseconds/second with n) since Unix epoch.
+ * </p>
+ */
+public class Date {
+    public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Date";
+
+    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    /**
+     * Returns a SchemaBuilder for a Date. By returning a SchemaBuilder you can override additional schema settings such
+     * as required/optional, default value, and documentation.
+     * @return a SchemaBuilder
+     */
+    public static SchemaBuilder builder() {
+        return SchemaBuilder.int32()
+                .name(LOGICAL_NAME)
+                .version(1);
+    }
+
+    public static final Schema SCHEMA = builder().schema();
+
+    /**
+     * Convert a value from its logical format (Date) to it's encoded format.
+     * @param value the logical value
+     * @return the encoded value
+     */
+    public static int fromLogical(Schema schema, java.util.Date value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Date object but the schema does not match.");
+        Calendar calendar = Calendar.getInstance(UTC);
+        calendar.setTime(value);
+        if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || calendar.get(Calendar.MINUTE) != 0 ||
+                calendar.get(Calendar.SECOND) != 0 || calendar.get(Calendar.MILLISECOND) != 0) {
+            throw new DataException("Copycat Date type should not have any time fields set to non-zero values.");
+        }
+        long unixMillis = calendar.getTimeInMillis();
+        return (int) (unixMillis / MILLIS_PER_DAY);
+    }
+
+    public static java.util.Date toLogical(Schema schema, int value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Date object but the schema does not match.");
+        return new java.util.Date(value * MILLIS_PER_DAY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
new file mode 100644
index 0000000..f23e13e
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * <p>
+ *     An arbitrary-precision signed decimal number. The value is unscaled * 10 ^ -scale where:
+ *     <ul>
+ *         <li>unscaled is an integer </li>
+ *         <li>scale is an integer representing how many digits the decimal point should be shifted on the unscaled value</li>
+ *     </ul>
+ * </p>
+ * <p>
+ *     Decimal does not provide a fixed schema because it is parameterized by the scale, which is fixed on the schema
+ *     rather than being part of the value.
+ * </p>
+ * <p>
+ *     The underlying representation of this type is bytes containing a two's complement integer
+ * </p>
+ */
+public class Decimal {
+    public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Decimal";
+    public static final String SCALE_FIELD = "scale";
+
+    /**
+     * Returns a SchemaBuilder for a Decimal with the given scale factor. By returning a SchemaBuilder you can override
+     * additional schema settings such as required/optional, default value, and documentation.
+     * @param scale the scale factor to apply to unscaled values
+     * @return a SchemaBuilder
+     */
+    public static SchemaBuilder builder(int scale) {
+        return SchemaBuilder.bytes()
+                .name(LOGICAL_NAME)
+                .parameter(SCALE_FIELD, ((Integer) scale).toString())
+                .version(1);
+    }
+
+    public static Schema schema(int scale) {
+        return builder(scale).build();
+    }
+
+    /**
+     * Convert a value from its logical format (BigDecimal) to it's encoded format.
+     * @param value the logical value
+     * @return the encoded value
+     */
+    public static byte[] fromLogical(Schema schema, BigDecimal value) {
+        if (value.scale() != scale(schema))
+            throw new DataException("BigDecimal has mismatching scale value for given Decimal schema");
+        return value.unscaledValue().toByteArray();
+    }
+
+    public static BigDecimal toLogical(Schema schema, byte[] value) {
+        return new BigDecimal(new BigInteger(value), scale(schema));
+    }
+
+    private static int scale(Schema schema) {
+        String scaleString = schema.parameters().get(SCALE_FIELD);
+        if (scaleString == null)
+            throw new DataException("Invalid Decimal schema: scale parameter not found.");
+        try {
+            return Integer.parseInt(scaleString);
+        } catch (NumberFormatException e) {
+            throw new DataException("Invalid scale parameter found in Decimal schema: ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
index 4ece21d..3db01ae 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.copycat.data;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * <p>
@@ -124,6 +125,12 @@ public interface Schema {
     String doc();
 
     /**
+     * Get a map of schema parameters.
+     * @return Map containing parameters for this schema, or null if there are no parameters
+     */
+    Map<String, String> parameters();
+
+    /**
      * Get the key schema for this map schema. Throws a DataException if this schema is not a map.
      * @return the key schema
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
index d9c149d..21ae54c 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
@@ -22,7 +22,9 @@ import org.apache.kafka.copycat.errors.SchemaBuilderException;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * <p>
@@ -71,7 +73,8 @@ public class SchemaBuilder implements Schema {
     private Integer version;
     // Optional human readable documentation describing this schema.
     private String doc;
-
+    // Additional parameters for logical types.
+    private Map<String, String> parameters;
 
     private SchemaBuilder(Type type) {
         this.type = type;
@@ -176,6 +179,41 @@ public class SchemaBuilder implements Schema {
         return this;
     }
 
+    @Override
+    public Map<String, String> parameters() {
+        return Collections.unmodifiableMap(parameters);
+    }
+
+    /**
+     * Set a schema parameter.
+     * @param propertyName name of the schema property to define
+     * @param propertyValue value of the schema property to define, as a String
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder parameter(String propertyName, String propertyValue) {
+        // Preserve order of insertion with a LinkedHashMap. This isn't strictly necessary, but is nice if logical types
+        // can print their properties in a consistent order.
+        if (parameters == null)
+            parameters = new LinkedHashMap<>();
+        parameters.put(propertyName, propertyValue);
+        return this;
+    }
+
+    /**
+     * Set schema parameters. This operation is additive; it does not remove existing parameters that do not appear in
+     * the set of properties pass to this method.
+     * @param props Map of properties to set
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder parameters(Map<String, String> props) {
+        // Avoid creating an empty set of properties so we never have an empty map
+        if (props.isEmpty())
+            return this;
+        if (parameters == null)
+            parameters = new LinkedHashMap<>();
+        parameters.putAll(props);
+        return this;
+    }
 
     @Override
     public Type type() {
@@ -347,7 +385,9 @@ public class SchemaBuilder implements Schema {
      * @return the {@link Schema}
      */
     public Schema build() {
-        return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema);
+        return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc,
+                parameters == null ? null : Collections.unmodifiableMap(parameters),
+                fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
new file mode 100644
index 0000000..62d371c
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.util.TimeZone;
+
+/**
+ * <p>
+ *     A timestamp representing an absolute time, without timezone information. The corresponding Java type is a
+ *     java.util.Date. The underlying representation is a long representing the number of milliseconds since Unix epoch.
+ * </p>
+ */
+public class Timestamp {
+    public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Timestamp";
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    /**
+     * Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder you can override additional schema settings such
+     * as required/optional, default value, and documentation.
+     * @return a SchemaBuilder
+     */
+    public static SchemaBuilder builder() {
+        return SchemaBuilder.int64()
+                .name(LOGICAL_NAME)
+                .version(1);
+    }
+
+    public static final Schema SCHEMA = builder().schema();
+
+    /**
+     * Convert a value from its logical format (Date) to it's encoded format.
+     * @param value the logical value
+     * @return the encoded value
+     */
+    public static long fromLogical(Schema schema, java.util.Date value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Timestamp object but the schema does not match.");
+        return value.getTime();
+    }
+
+    public static java.util.Date toLogical(Schema schema, long value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Timestamp object but the schema does not match.");
+        return new java.util.Date(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
index f400863..9e97cad 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
@@ -208,14 +208,15 @@ public class CopycatSchemaTest {
     @Test
     public void testPrimitiveEquality() {
         // Test that primitive types, which only need to consider all the type & metadata fields, handle equality correctly
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", null, null, null);
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", null, null, null);
-        CopycatSchema differentType = new CopycatSchema(Schema.Type.INT16, false, null, "name", 2, "doc", null, null, null);
-        CopycatSchema differentOptional = new CopycatSchema(Schema.Type.INT8, true, null, "name", 2, "doc", null, null, null);
-        CopycatSchema differentDefault = new CopycatSchema(Schema.Type.INT8, false, true, "name", 2, "doc", null, null, null);
-        CopycatSchema differentName = new CopycatSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc", null, null, null);
-        CopycatSchema differentVersion = new CopycatSchema(Schema.Type.INT8, false, null, "name", 4, "doc", null, null, null);
-        CopycatSchema differentDoc = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "other doc", null, null, null);
+        CopycatSchema s1 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc");
+        CopycatSchema s2 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc");
+        CopycatSchema differentType = new CopycatSchema(Schema.Type.INT16, false, null, "name", 2, "doc");
+        CopycatSchema differentOptional = new CopycatSchema(Schema.Type.INT8, true, null, "name", 2, "doc");
+        CopycatSchema differentDefault = new CopycatSchema(Schema.Type.INT8, false, true, "name", 2, "doc");
+        CopycatSchema differentName = new CopycatSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc");
+        CopycatSchema differentVersion = new CopycatSchema(Schema.Type.INT8, false, null, "name", 4, "doc");
+        CopycatSchema differentDoc = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "other doc");
+        CopycatSchema differentParameters = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", Collections.singletonMap("param", "value"), null, null, null);
 
         assertEquals(s1, s2);
         assertNotEquals(s1, differentType);
@@ -224,15 +225,16 @@ public class CopycatSchemaTest {
         assertNotEquals(s1, differentName);
         assertNotEquals(s1, differentVersion);
         assertNotEquals(s1, differentDoc);
+        assertNotEquals(s1, differentParameters);
     }
 
     @Test
     public void testArrayEquality() {
         // Validate that the value type for the array is tested for equality. This test makes sure the same schema object is
         // never reused to ensure we're actually checking equality
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int8().build());
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int8().build());
-        CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int16().build());
+        CopycatSchema s1 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build());
+        CopycatSchema s2 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build());
+        CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int16().build());
 
         assertEquals(s1, s2);
         assertNotEquals(s1, differentValueSchema);
@@ -241,10 +243,10 @@ public class CopycatSchemaTest {
     @Test
     public void testMapEquality() {
         // Same as testArrayEquality, but for both key and value schemas
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
-        CopycatSchema differentKeySchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build());
-        CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build());
+        CopycatSchema s1 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
+        CopycatSchema s2 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
+        CopycatSchema differentKeySchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build());
+        CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build());
 
         assertEquals(s1, s2);
         assertNotEquals(s1, differentKeySchema);
@@ -255,13 +257,13 @@ public class CopycatSchemaTest {
     public void testStructEquality() {
         // Same as testArrayEquality, but checks differences in fields. Only does a simple check, relying on tests of
         // Field's equals() method to validate all variations in the list of fields will be checked
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+        CopycatSchema s1 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
                 Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
                         new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+        CopycatSchema s2 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
                 Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
                         new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
-        CopycatSchema differentField = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+        CopycatSchema differentField = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
                 Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
                         new Field("different field name", 1, SchemaBuilder.int16().build())), null, null);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
new file mode 100644
index 0000000..e7885ab
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class DateTest {
+    private static final GregorianCalendar EPOCH;
+    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS;
+    private static final GregorianCalendar EPOCH_PLUS_TIME_COMPONENT;
+    static {
+        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        EPOCH_PLUS_TIME_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 1);
+        EPOCH_PLUS_TIME_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000);
+    }
+
+    @Test
+    public void testBuilder() {
+        Schema plain = Date.SCHEMA;
+        assertEquals(Date.LOGICAL_NAME, plain.name());
+        assertEquals(1, (Object) plain.version());
+    }
+
+    @Test
+    public void testFromLogical() {
+        assertEquals(0, Date.fromLogical(Date.SCHEMA, EPOCH.getTime()));
+        assertEquals(10000, Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime()));
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidSchema() {
+        Date.fromLogical(Date.builder().name("invalid").build(), EPOCH.getTime());
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidHasTimeComponents() {
+        Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TIME_COMPONENT.getTime());
+    }
+
+    @Test
+    public void testToLogical() {
+        assertEquals(EPOCH.getTime(), Date.toLogical(Date.SCHEMA, 0));
+        assertEquals(EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime(), Date.toLogical(Date.SCHEMA, 10000));
+    }
+
+    @Test(expected = DataException.class)
+    public void testToLogicalInvalidSchema() {
+        Date.toLogical(Date.builder().name("invalid").build(), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
new file mode 100644
index 0000000..ce71161
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collections;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class DecimalTest {
+    private static final int TEST_SCALE = 2;
+    private static final BigDecimal TEST_DECIMAL = new BigDecimal(new BigInteger("156"), TEST_SCALE);
+    private static final BigDecimal TEST_DECIMAL_NEGATIVE = new BigDecimal(new BigInteger("-156"), TEST_SCALE);
+    private static final byte[] TEST_BYTES = new byte[]{0, -100};
+    private static final byte[] TEST_BYTES_NEGATIVE = new byte[]{-1, 100};
+
+    @Test
+    public void testBuilder() {
+        Schema plain = Decimal.builder(2).build();
+        assertEquals(Decimal.LOGICAL_NAME, plain.name());
+        assertEquals(Collections.singletonMap(Decimal.SCALE_FIELD, "2"), plain.parameters());
+        assertEquals(1, (Object) plain.version());
+    }
+
+    @Test
+    public void testFromLogical() {
+        Schema schema = Decimal.schema(TEST_SCALE);
+        byte[] encoded = Decimal.fromLogical(schema, TEST_DECIMAL);
+        assertArrayEquals(TEST_BYTES, encoded);
+
+        encoded = Decimal.fromLogical(schema, TEST_DECIMAL_NEGATIVE);
+        assertArrayEquals(TEST_BYTES_NEGATIVE, encoded);
+    }
+
+    @Test
+    public void testToLogical() {
+        Schema schema = Decimal.schema(2);
+        BigDecimal converted = Decimal.toLogical(schema, TEST_BYTES);
+        assertEquals(TEST_DECIMAL, converted);
+
+        converted = Decimal.toLogical(schema, TEST_BYTES_NEGATIVE);
+        assertEquals(TEST_DECIMAL_NEGATIVE, converted);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
index fca1c10..183f5fc 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -33,6 +34,7 @@ public class SchemaBuilderTest {
     private static final String NAME = "name";
     private static final Integer VERSION = 2;
     private static final String DOC = "doc";
+    private static final Map<String, String> NO_PARAMS = null;
 
     @Test
     public void testInt8Builder() {
@@ -43,7 +45,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12)
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12);
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -60,7 +62,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12)
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12);
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -77,7 +79,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12)
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.INT32, true, 12);
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -94,7 +96,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12)
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12);
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -111,7 +113,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f)
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f);
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -128,7 +130,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0)
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0);
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -145,7 +147,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true)
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true);
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -162,7 +164,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a default string")
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default string");
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -179,7 +181,7 @@ public class SchemaBuilderTest {
         schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a default byte array".getBytes())
                 .version(VERSION).doc(DOC).build();
         assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte array".getBytes());
-        assertMetadata(schema, NAME, VERSION, DOC);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
     }
 
     @Test(expected = SchemaBuilderException.class)
@@ -188,6 +190,21 @@ public class SchemaBuilderTest {
     }
 
 
+    @Test
+    public void testParameters() {
+        Map<String, String> expectedParameters = new HashMap<>();
+        expectedParameters.put("foo", "val");
+        expectedParameters.put("bar", "baz");
+
+        Schema schema = SchemaBuilder.string().parameter("foo", "val").parameter("bar", "baz").build();
+        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+        assertMetadata(schema, null, null, null, expectedParameters);
+
+        schema = SchemaBuilder.string().parameters(expectedParameters).build();
+        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+        assertMetadata(schema, null, null, null, expectedParameters);
+    }
+
 
     @Test
     public void testStructBuilder() {
@@ -275,13 +292,14 @@ public class SchemaBuilderTest {
         }
     }
 
-    private void assertMetadata(Schema schema, String name, Integer version, String doc) {
+    private void assertMetadata(Schema schema, String name, Integer version, String doc, Map<String, String> parameters) {
         assertEquals(name, schema.name());
         assertEquals(version, schema.version());
         assertEquals(doc, schema.doc());
+        assertEquals(parameters, schema.parameters());
     }
 
     private void assertNoMetadata(Schema schema) {
-        assertMetadata(schema, null, null, null);
+        assertMetadata(schema, null, null, null, null);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java
new file mode 100644
index 0000000..cb5454c
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimestampTest {
+    private static final GregorianCalendar EPOCH;
+    private static final GregorianCalendar EPOCH_PLUS_MILLIS;
+
+    private static final int NUM_MILLIS = 2000000000;
+    private static final long TOTAL_MILLIS = ((long) NUM_MILLIS) * 2;
+
+    static {
+        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+
+        EPOCH_PLUS_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH_PLUS_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS);
+        EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS);
+    }
+
+    @Test
+    public void testBuilder() {
+        Schema plain = Date.SCHEMA;
+        assertEquals(Date.LOGICAL_NAME, plain.name());
+        assertEquals(1, (Object) plain.version());
+    }
+
+    @Test
+    public void testFromLogical() {
+        assertEquals(0L, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH.getTime()));
+        assertEquals(TOTAL_MILLIS, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH_PLUS_MILLIS.getTime()));
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidSchema() {
+        Timestamp.fromLogical(Timestamp.builder().name("invalid").build(), EPOCH.getTime());
+    }
+
+    @Test
+    public void testToLogical() {
+        assertEquals(EPOCH.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, 0L));
+        assertEquals(EPOCH_PLUS_MILLIS.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, TOTAL_MILLIS));
+    }
+
+    @Test(expected = DataException.class)
+    public void testToLogicalInvalidSchema() {
+        Date.toLogical(Date.builder().name("invalid").build(), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
index 5b37f27..8910b27 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
@@ -30,6 +30,7 @@ import org.apache.kafka.copycat.errors.DataException;
 import org.apache.kafka.copycat.storage.Converter;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -192,10 +193,71 @@ public class JsonConverter implements Converter {
                 return result;
             }
         });
+    }
+
+    // Convert values in Copycat form into their logical types. These logical converters are discovered by logical type
+    // names specified in the field
+    private static final HashMap<String, LogicalTypeConverter> TO_COPYCAT_LOGICAL_CONVERTERS = new HashMap<>();
+    static {
+        TO_COPYCAT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof byte[]))
+                    throw new DataException("Invalid type for Decimal, underlying representation should be bytes but was " + value.getClass());
+                return Decimal.toLogical(schema, (byte[]) value);
+            }
+        });
+
+        TO_COPYCAT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof Integer))
+                    throw new DataException("Invalid type for Date, underlying representation should be int32 but was " + value.getClass());
+                return Date.toLogical(schema, (int) value);
+            }
+        });
+
+        TO_COPYCAT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof Long))
+                    throw new DataException("Invalid type for Timestamp, underlying representation should be int64 but was " + value.getClass());
+                return Timestamp.toLogical(schema, (long) value);
+            }
+        });
+    }
+
+    private static final HashMap<String, LogicalTypeConverter> TO_JSON_LOGICAL_CONVERTERS = new HashMap<>();
+    static {
+        TO_JSON_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof BigDecimal))
+                    throw new DataException("Invalid type for Decimal, expected BigDecimal but was " + value.getClass());
+                return Decimal.fromLogical(schema, (BigDecimal) value);
+            }
+        });
 
+        TO_JSON_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof java.util.Date))
+                    throw new DataException("Invalid type for Date, expected Date but was " + value.getClass());
+                return Date.fromLogical(schema, (java.util.Date) value);
+            }
+        });
 
+        TO_JSON_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof java.util.Date))
+                    throw new DataException("Invalid type for Timestamp, expected Date but was " + value.getClass());
+                return Timestamp.fromLogical(schema, (java.util.Date) value);
+            }
+        });
     }
 
+
     private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
     private int cacheSize = SCHEMAS_CACHE_SIZE_DEFAULT;
     private Cache<Schema, ObjectNode> fromCopycatSchemaCache;
@@ -332,6 +394,12 @@ public class JsonConverter implements Converter {
             jsonSchema.put(JsonSchema.SCHEMA_VERSION_FIELD_NAME, schema.version());
         if (schema.doc() != null)
             jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc());
+        if (schema.parameters() != null) {
+            ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode();
+            for (Map.Entry<String, String> prop : schema.parameters().entrySet())
+                jsonSchemaParams.put(prop.getKey(), prop.getValue());
+            jsonSchema.put(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
+        }
         if (schema.defaultValue() != null)
             jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue()));
 
@@ -432,6 +500,18 @@ public class JsonConverter implements Converter {
         if (schemaDocNode != null && schemaDocNode.isTextual())
             builder.doc(schemaDocNode.textValue());
 
+        JsonNode schemaParamsNode = jsonSchema.get(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME);
+        if (schemaParamsNode != null && schemaParamsNode.isObject()) {
+            Iterator<Map.Entry<String, JsonNode>> paramsIt = schemaParamsNode.fields();
+            while (paramsIt.hasNext()) {
+                Map.Entry<String, JsonNode> entry = paramsIt.next();
+                JsonNode paramValue = entry.getValue();
+                if (!paramValue.isTextual())
+                    throw new DataException("Schema parameters must have string values.");
+                builder.parameter(entry.getKey(), paramValue.textValue());
+            }
+        }
+
         JsonNode schemaDefaultNode = jsonSchema.get(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME);
         if (schemaDefaultNode != null)
             builder.defaultValue(convertToCopycat(builder, schemaDefaultNode));
@@ -461,8 +541,8 @@ public class JsonConverter implements Converter {
      * Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema
      * and the converted object.
      */
-    private static JsonNode convertToJson(Schema schema, Object value) {
-        if (value == null) {
+    private static JsonNode convertToJson(Schema schema, Object logicalValue) {
+        if (logicalValue == null) {
             if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
                 return null;
             if (schema.defaultValue() != null)
@@ -472,6 +552,13 @@ public class JsonConverter implements Converter {
             throw new DataException("Conversion error: null value for field that is required and has no default value");
         }
 
+        Object value = logicalValue;
+        if (schema != null && schema.name() != null) {
+            LogicalTypeConverter logicalConverter = TO_JSON_LOGICAL_CONVERTERS.get(schema.name());
+            if (logicalConverter != null)
+                value = logicalConverter.convert(schema, logicalValue);
+        }
+
         try {
             final Schema.Type schemaType;
             if (schema == null) {
@@ -610,11 +697,21 @@ public class JsonConverter implements Converter {
         if (typeConverter == null)
             throw new DataException("Unknown schema type: " + schema.type());
 
-        return typeConverter.convert(schema, jsonValue);
+        Object converted = typeConverter.convert(schema, jsonValue);
+        if (schema != null && schema.name() != null) {
+            LogicalTypeConverter logicalConverter = TO_COPYCAT_LOGICAL_CONVERTERS.get(schema.name());
+            if (logicalConverter != null)
+                converted = logicalConverter.convert(schema, converted);
+        }
+        return converted;
     }
 
 
     private interface JsonToCopycatTypeConverter {
         Object convert(Schema schema, JsonNode value);
     }
+
+    private interface LogicalTypeConverter {
+        Object convert(Schema schema, Object value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
index b7657be..78712f3 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
@@ -30,6 +30,7 @@ public class JsonSchema {
     static final String SCHEMA_NAME_FIELD_NAME = "name";
     static final String SCHEMA_VERSION_FIELD_NAME = "version";
     static final String SCHEMA_DOC_FIELD_NAME = "doc";
+    static final String SCHEMA_PARAMETERS_FIELD_NAME = "parameters";
     static final String SCHEMA_DEFAULT_FIELD_NAME = "default";
     static final String ARRAY_ITEMS_FIELD_NAME = "items";
     static final String MAP_KEY_FIELD_NAME = "keys";

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e103b7/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
index 96f8544..8a8e243 100644
--- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
+++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
@@ -21,12 +21,15 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.kafka.copycat.data.Date;
+import org.apache.kafka.copycat.data.Decimal;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.cache.Cache;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.data.SchemaBuilder;
 import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.data.Timestamp;
 import org.apache.kafka.copycat.errors.DataException;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,9 +37,20 @@ import org.powermock.reflect.Whitebox;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.util.*;
-
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -62,8 +76,8 @@ public class JsonConverterTest {
         assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes()));
         assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
                 converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes()));
-        assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").build(), true),
-                converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }".getBytes()));
+        assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").parameter("foo", "bar").build(), true),
+                converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\", \"parameters\": { \"foo\": \"bar\" }}, \"payload\": true }".getBytes()));
     }
 
     // Schema types
@@ -189,6 +203,47 @@ public class JsonConverterTest {
         assertEquals(new SchemaAndValue(null, obj), converted);
     }
 
+    @Test
+    public void decimalToCopycat() {
+        Schema schema = Decimal.schema(2);
+        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        // Payload is base64 encoded byte[]{0, -100}, which is the two's complement encoding of 156.
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.copycat.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"2\" } }, \"payload\": \"AJw=\" }";
+        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes());
+        BigDecimal converted = (BigDecimal) schemaAndValue.value();
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, converted);
+    }
+
+    @Test
+    public void dateToCopycat() {
+        Schema schema = Date.SCHEMA;
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.DATE, 10000);
+        java.util.Date reference = calendar.getTime();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.copycat.data.Date\", \"version\": 1 }, \"payload\": 10000 }";
+        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes());
+        java.util.Date converted = (java.util.Date) schemaAndValue.value();
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, converted);
+    }
+
+    @Test
+    public void timestampToCopycat() {
+        Schema schema = Timestamp.SCHEMA;
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.MILLISECOND, 2000000000);
+        calendar.add(Calendar.MILLISECOND, 2000000000);
+        java.util.Date reference = calendar.getTime();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": \"org.apache.kafka.copycat.data.Timestamp\", \"version\": 1 }, \"payload\": 4000000000 }";
+        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes());
+        java.util.Date converted = (java.util.Date) schemaAndValue.value();
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, converted);
+    }
+
     // Schema metadata
 
     @Test
@@ -208,9 +263,9 @@ public class JsonConverterTest {
         assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
 
-        converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true));
+        converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").parameter("foo", "bar").build(), true));
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\"}"),
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\", \"parameters\": { \"foo\": \"bar\" }}"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
     }
@@ -375,6 +430,49 @@ public class JsonConverterTest {
 
 
     @Test
+    public void decimalToJson() throws IOException {
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2)));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"2\" } }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertArrayEquals(new byte[]{0, -100}, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue());
+    }
+
+    @Test
+    public void dateToJson() throws IOException {
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.DATE, 10000);
+        java.util.Date date = calendar.getTime();
+
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Date.SCHEMA, date));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Date\", \"version\": 1 }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertTrue(payload.isInt());
+        assertEquals(10000, payload.intValue());
+    }
+
+    @Test
+    public void timestampToJson() throws IOException {
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.MILLISECOND, 2000000000);
+        calendar.add(Calendar.MILLISECOND, 2000000000);
+        java.util.Date date = calendar.getTime();
+
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Timestamp.SCHEMA, date));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Timestamp\", \"version\": 1 }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertTrue(payload.isLong());
+        assertEquals(4000000000L, payload.longValue());
+    }
+
+
+    @Test
     public void nullSchemaAndPrimitiveToJson() {
         // This still needs to do conversion of data, null schema means "anything goes"
         JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true));


Mime
View raw message