kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [2/5] kafka git commit: KAFKA-2367; Add Copycat runtime data API.
Date Thu, 27 Aug 2015 18:58:52 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 7e4ca7e..5ef657f 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -64,7 +64,7 @@ public class FileStreamSinkTask extends SinkTask {
     @Override
     public void put(Collection<SinkRecord> sinkRecords) {
         for (SinkRecord record : sinkRecords) {
-            outputStream.println(record.getValue());
+            outputStream.println(record.value());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
index 4f9d8d0..716322f 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
@@ -47,12 +47,12 @@ public class FileStreamSourceConnector extends SourceConnector {
     }
 
     @Override
-    public Class<? extends Task> getTaskClass() {
+    public Class<? extends Task> taskClass() {
         return FileStreamSourceTask.class;
     }
 
     @Override
-    public List<Properties> getTaskConfigs(int maxTasks) {
+    public List<Properties> taskConfigs(int maxTasks) {
         ArrayList<Properties> configs = new ArrayList<>();
         // Only one input stream makes sense.
         Properties config = new Properties();

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index 572ae1f..e411cad 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.copycat.file;
 
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
@@ -33,7 +35,11 @@ import java.util.Properties;
  */
 public class FileStreamSourceTask extends SourceTask {
     private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
+    private static final Schema OFFSET_KEY_SCHEMA = Schema.STRING_SCHEMA;
+    private static final Schema OFFSET_VALUE_SCHEMA = Schema.OPTIONAL_INT64_SCHEMA;
+    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
 
+    private String filename;
     private InputStream stream;
     private BufferedReader reader = null;
     private char[] buffer = new char[1024];
@@ -44,7 +50,7 @@ public class FileStreamSourceTask extends SourceTask {
 
     @Override
     public void start(Properties props) {
-        String filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG);
+        filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG);
         if (filename == null) {
             stream = System.in;
             // Tracking offset for stdin doesn't make sense
@@ -52,22 +58,29 @@ public class FileStreamSourceTask extends SourceTask {
         } else {
             try {
                 stream = new FileInputStream(filename);
-                Long lastRecordedOffset = (Long) context.getOffsetStorageReader().getOffset(null);
-                if (lastRecordedOffset != null) {
-                    log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
-                    long skipLeft = lastRecordedOffset;
-                    while (skipLeft > 0) {
-                        try {
-                            long skipped = stream.skip(skipLeft);
-                            skipLeft -= skipped;
-                        } catch (IOException e) {
-                            log.error("Error while trying to seek to previous offset in file: ", e);
-                            throw new CopycatException(e);
+                SchemaAndValue offsetWithSchema = context.offsetStorageReader().offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, filename));
+                if (offsetWithSchema != null) {
+                    if (!offsetWithSchema.schema().equals(OFFSET_VALUE_SCHEMA))
+                        throw new CopycatException("Unexpected offset schema.");
+                    Long lastRecordedOffset = (Long) offsetWithSchema.value();
+                    if (lastRecordedOffset != null) {
+                        log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
+                        long skipLeft = lastRecordedOffset;
+                        while (skipLeft > 0) {
+                            try {
+                                long skipped = stream.skip(skipLeft);
+                                skipLeft -= skipped;
+                            } catch (IOException e) {
+                                log.error("Error while trying to seek to previous offset in file: ", e);
+                                throw new CopycatException(e);
+                            }
                         }
+                        log.debug("Skipped to offset {}", lastRecordedOffset);
                     }
-                    log.debug("Skipped to offset {}", lastRecordedOffset);
+                    streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L;
+                } else {
+                    streamOffset = 0L;
                 }
-                streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L;
             } catch (FileNotFoundException e) {
                 throw new CopycatException("Couldn't find file for FileStreamSourceTask: {}", e);
             }
@@ -111,7 +124,7 @@ public class FileStreamSourceTask extends SourceTask {
                         if (line != null) {
                             if (records == null)
                                 records = new ArrayList<>();
-                            records.add(new SourceRecord(null, streamOffset, topic, line));
+                            records.add(new SourceRecord(OFFSET_KEY_SCHEMA, filename, OFFSET_VALUE_SCHEMA, streamOffset, topic, VALUE_SCHEMA, line));
                         }
                         new ArrayList<SourceRecord>();
                     } while (line != null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
index 643fb43..ab5fd3b 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
@@ -60,11 +60,11 @@ public class FileStreamSinkConnectorTest {
         PowerMock.replayAll();
 
         connector.start(sinkProperties);
-        List<Properties> taskConfigs = connector.getTaskConfigs(1);
+        List<Properties> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
         assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
 
-        taskConfigs = connector.getTaskConfigs(2);
+        taskConfigs = connector.taskConfigs(2);
         assertEquals(2, taskConfigs.size());
         for (int i = 0; i < 2; i++) {
             assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
@@ -78,7 +78,7 @@ public class FileStreamSinkConnectorTest {
         PowerMock.replayAll();
 
         connector.start(sinkProperties);
-        assertEquals(FileStreamSinkTask.class, connector.getTaskClass());
+        assertEquals(FileStreamSinkTask.class, connector.taskClass());
 
         PowerMock.verifyAll();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
index b4e1b0c..1dfb5d8 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.copycat.file;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,15 +50,15 @@ public class FileStreamSinkTaskTest {
         // We do not call task.start() since it would override the output stream
 
         task.put(Arrays.asList(
-                new SinkRecord("topic1", 0, null, "line1", 1)
+                new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
         ));
         offsets.put(new TopicPartition("topic1", 0), 1L);
         task.flush(offsets);
         assertEquals("line1\n", os.toString());
 
         task.put(Arrays.asList(
-                new SinkRecord("topic1", 0, null, "line2", 2),
-                new SinkRecord("topic2", 0, null, "line3", 1)
+                new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2),
+                new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1)
         ));
         offsets.put(new TopicPartition("topic1", 0), 2L);
         offsets.put(new TopicPartition("topic2", 0), 1L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
index e23055c..41e15a0 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
@@ -55,7 +55,7 @@ public class FileStreamSourceConnectorTest {
         PowerMock.replayAll();
 
         connector.start(sourceProperties);
-        List<Properties> taskConfigs = connector.getTaskConfigs(1);
+        List<Properties> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
         assertEquals(FILENAME,
                 taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
@@ -63,7 +63,7 @@ public class FileStreamSourceConnectorTest {
                 taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG));
 
         // Should be able to return fewer than requested #
-        taskConfigs = connector.getTaskConfigs(2);
+        taskConfigs = connector.taskConfigs(2);
         assertEquals(1, taskConfigs.size());
         assertEquals(FILENAME,
                 taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
@@ -79,7 +79,7 @@ public class FileStreamSourceConnectorTest {
 
         sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
         connector.start(sourceProperties);
-        List<Properties> taskConfigs = connector.getTaskConfigs(1);
+        List<Properties> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
         assertNull(taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
 
@@ -97,7 +97,7 @@ public class FileStreamSourceConnectorTest {
         PowerMock.replayAll();
 
         connector.start(sourceProperties);
-        assertEquals(FileStreamSourceTask.class, connector.getTaskClass());
+        assertEquals(FileStreamSourceTask.class, connector.taskClass());
 
         PowerMock.verifyAll();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
index 0ec71d3..ee45493 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.copycat.file;
 
+import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -86,9 +87,9 @@ public class FileStreamSourceTaskTest {
         os.flush();
         List<SourceRecord> records = task.poll();
         assertEquals(1, records.size());
-        assertEquals(TOPIC, records.get(0).getTopic());
-        assertEquals("partial line finished", records.get(0).getValue());
-        assertEquals(22L, records.get(0).getSourceOffset());
+        assertEquals(TOPIC, records.get(0).topic());
+        assertEquals("partial line finished", records.get(0).value());
+        assertEquals(22L, records.get(0).sourceOffset());
         assertEquals(null, task.poll());
 
         // Different line endings, and make sure the final \r doesn't result in a line until we can
@@ -97,21 +98,21 @@ public class FileStreamSourceTaskTest {
         os.flush();
         records = task.poll();
         assertEquals(4, records.size());
-        assertEquals("line1", records.get(0).getValue());
-        assertEquals(28L, records.get(0).getSourceOffset());
-        assertEquals("line2", records.get(1).getValue());
-        assertEquals(35L, records.get(1).getSourceOffset());
-        assertEquals("line3", records.get(2).getValue());
-        assertEquals(41L, records.get(2).getSourceOffset());
-        assertEquals("line4", records.get(3).getValue());
-        assertEquals(47L, records.get(3).getSourceOffset());
+        assertEquals("line1", records.get(0).value());
+        assertEquals(28L, records.get(0).sourceOffset());
+        assertEquals("line2", records.get(1).value());
+        assertEquals(35L, records.get(1).sourceOffset());
+        assertEquals("line3", records.get(2).value());
+        assertEquals(41L, records.get(2).sourceOffset());
+        assertEquals("line4", records.get(3).value());
+        assertEquals(47L, records.get(3).sourceOffset());
 
         os.write("subsequent text".getBytes());
         os.flush();
         records = task.poll();
         assertEquals(1, records.size());
-        assertEquals("", records.get(0).getValue());
-        assertEquals(48L, records.get(0).getSourceOffset());
+        assertEquals("", records.get(0).value());
+        assertEquals(48L, records.get(0).sourceOffset());
 
         task.stop();
     }
@@ -133,8 +134,6 @@ public class FileStreamSourceTaskTest {
 
 
     private void expectOffsetLookupReturnNone() {
-        EasyMock.expect(
-                offsetStorageReader.getOffset(EasyMock.anyObject(Object.class)))
-                .andReturn(null);
+        EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(SchemaAndValue.class))).andReturn(null);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
index 36a6ca8..67df11d 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.copycat.data.*;
-import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.DataException;
 import org.apache.kafka.copycat.storage.Converter;
 
 import java.io.IOException;
@@ -34,62 +34,92 @@ import java.util.*;
  */
 public class JsonConverter implements Converter<JsonNode> {
 
-    private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
-            = new HashMap<>();
+    private static final HashMap<Schema.Type, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS = new HashMap<>();
+
+    private static Object checkOptionalAndDefault(Schema schema) {
+        if (schema.defaultValue() != null)
+            return schema.defaultValue();
+        if (schema.isOptional())
+            return null;
+        throw new DataException("Invalid null value for required field");
+    }
 
     static {
-        TO_COPYCAT_CONVERTERS.put(JsonSchema.BOOLEAN_TYPE_NAME, new JsonToCopycatTypeConverter() {
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.BOOLEAN, new JsonToCopycatTypeConverter() {
             @Override
-            public Object convert(JsonNode jsonSchema, JsonNode value) {
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.booleanValue();
             }
         });
-        TO_COPYCAT_CONVERTERS.put(JsonSchema.INT_TYPE_NAME, new JsonToCopycatTypeConverter() {
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.INT8, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return (byte) value.intValue();
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.INT16, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+                return (short) value.intValue();
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.INT32, new JsonToCopycatTypeConverter() {
             @Override
-            public Object convert(JsonNode jsonSchema, JsonNode value) {
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.intValue();
             }
         });
-        TO_COPYCAT_CONVERTERS.put(JsonSchema.LONG_TYPE_NAME, new JsonToCopycatTypeConverter() {
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.INT64, new JsonToCopycatTypeConverter() {
             @Override
-            public Object convert(JsonNode jsonSchema, JsonNode value) {
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.longValue();
             }
         });
-        TO_COPYCAT_CONVERTERS.put(JsonSchema.FLOAT_TYPE_NAME, new JsonToCopycatTypeConverter() {
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.FLOAT32, new JsonToCopycatTypeConverter() {
             @Override
-            public Object convert(JsonNode jsonSchema, JsonNode value) {
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.floatValue();
             }
         });
-        TO_COPYCAT_CONVERTERS.put(JsonSchema.DOUBLE_TYPE_NAME, new JsonToCopycatTypeConverter() {
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.FLOAT64, new JsonToCopycatTypeConverter() {
             @Override
-            public Object convert(JsonNode jsonSchema, JsonNode value) {
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.doubleValue();
             }
         });
-        TO_COPYCAT_CONVERTERS.put(JsonSchema.BYTES_TYPE_NAME, new JsonToCopycatTypeConverter() {
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.BYTES, new JsonToCopycatTypeConverter() {
             @Override
-            public Object convert(JsonNode jsonSchema, JsonNode value) {
+            public Object convert(Schema schema, JsonNode value) {
                 try {
+                    if (value.isNull()) return checkOptionalAndDefault(schema);
                     return value.binaryValue();
                 } catch (IOException e) {
-                    throw new CopycatException("Invalid bytes field", e);
+                    throw new DataException("Invalid bytes field", e);
                 }
             }
         });
-        TO_COPYCAT_CONVERTERS.put(JsonSchema.STRING_TYPE_NAME, new JsonToCopycatTypeConverter() {
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.STRING, new JsonToCopycatTypeConverter() {
             @Override
-            public Object convert(JsonNode jsonSchema, JsonNode value) {
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.textValue();
             }
         });
-        TO_COPYCAT_CONVERTERS.put(JsonSchema.ARRAY_TYPE_NAME, new JsonToCopycatTypeConverter() {
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.ARRAY, new JsonToCopycatTypeConverter() {
             @Override
-            public Object convert(JsonNode jsonSchema, JsonNode value) {
-                JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+
+                Schema elemSchema = schema.valueSchema();
                 if (elemSchema == null)
-                    throw new CopycatException("Array schema did not specify the element type");
+                    throw new DataException("Array schema did not specify the element type");
                 ArrayList<Object> result = new ArrayList<>();
                 for (JsonNode elem : value) {
                     result.add(convertToCopycat(elemSchema, elem));
@@ -97,54 +127,142 @@ public class JsonConverter implements Converter<JsonNode> {
                 return result;
             }
         });
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.MAP, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+
+                Schema keySchema = schema.keySchema();
+                Schema valueSchema = schema.valueSchema();
+
+                // If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other
+                // primitive types or a complex type as a key, it will be encoded as a list of pairs
+                Map<Object, Object> result = new HashMap<>();
+                if (keySchema.type() == Schema.Type.STRING) {
+                    if (!value.isObject())
+                        throw new DataException("Map's with string fields should be encoded as JSON objects, but found " + value.getNodeType());
+                    Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
+                    while (fieldIt.hasNext()) {
+                        Map.Entry<String, JsonNode> entry = fieldIt.next();
+                        result.put(entry.getKey(), convertToCopycat(valueSchema, entry.getValue()));
+                    }
+                } else {
+                    if (!value.isArray())
+                        throw new DataException("Map's with non-string fields should be encoded as JSON array of tuples, but found " + value.getNodeType());
+                    for (JsonNode entry : value) {
+                        if (!entry.isArray())
+                            throw new DataException("Found invalid map entry instead of array tuple: " + entry.getNodeType());
+                        if (entry.size() != 2)
+                            throw new DataException("Found invalid map entry, expected length 2 but found :" + entry.size());
+                        result.put(convertToCopycat(keySchema, entry.get(0)),
+                                convertToCopycat(valueSchema, entry.get(1)));
+                    }
+                }
+                return result;
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(Schema.Type.STRUCT, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(Schema schema, JsonNode value) {
+                if (value.isNull()) return checkOptionalAndDefault(schema);
+
+                if (!value.isObject())
+                    throw new DataException("Structs should be encoded as JSON objects, but found " + value.getNodeType());
+
+                // We only have ISchema here but need Schema, so we need to materialize the actual schema. Using ISchema
+                // avoids having to materialize the schema for non-Struct types but it cannot be avoided for Structs since
+                // they require a schema to be provided at construction. However, the schema is only a SchemaBuilder during
+                // translation of schemas to JSON; during the more common translation of data to JSON, the call to schema.schema()
+                // just returns the schema Object and has no overhead.
+                Struct result = new Struct(schema.schema());
+                for (Field field : schema.fields())
+                    result.put(field, convertToCopycat(field.schema(), value.get(field.name())));
+
+                return result;
+            }
+        });
 
     }
 
     @Override
-    public JsonNode fromCopycatData(Object value) {
-        return convertToJsonWithSchemaEnvelope(value);
+    public JsonNode fromCopycatData(Schema schema, Object value) {
+        return convertToJsonWithSchemaEnvelope(schema, value);
     }
 
     @Override
-    public Object toCopycatData(JsonNode value) {
+    public SchemaAndValue toCopycatData(JsonNode value) {
         if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
-            throw new CopycatException("JSON value converted to Copycat must be in envelope containing schema");
+            throw new DataException("JSON value converted to Copycat must be in envelope containing schema");
 
-        return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+        Schema schema = asCopycatSchema(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        return new SchemaAndValue(schema, convertToCopycat(schema, value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
     }
 
 
-    private static JsonNode asJsonSchema(Schema schema) {
-        switch (schema.getType()) {
+    private static ObjectNode asJsonSchema(Schema schema) {
+        final ObjectNode jsonSchema;
+        switch (schema.type()) {
             case BOOLEAN:
-                return JsonSchema.BOOLEAN_SCHEMA;
+                jsonSchema = JsonSchema.BOOLEAN_SCHEMA.deepCopy();
+                break;
             case BYTES:
-                return JsonSchema.BYTES_SCHEMA;
-            case DOUBLE:
-                return JsonSchema.DOUBLE_SCHEMA;
-            case FLOAT:
-                return JsonSchema.FLOAT_SCHEMA;
-            case INT:
-                return JsonSchema.INT_SCHEMA;
-            case LONG:
-                return JsonSchema.LONG_SCHEMA;
-            case NULL:
-                throw new UnsupportedOperationException("null schema not supported");
+                jsonSchema = JsonSchema.BYTES_SCHEMA.deepCopy();
+                break;
+            case FLOAT64:
+                jsonSchema = JsonSchema.DOUBLE_SCHEMA.deepCopy();
+                break;
+            case FLOAT32:
+                jsonSchema = JsonSchema.FLOAT_SCHEMA.deepCopy();
+                break;
+            case INT8:
+                jsonSchema = JsonSchema.INT8_SCHEMA.deepCopy();
+                break;
+            case INT16:
+                jsonSchema = JsonSchema.INT16_SCHEMA.deepCopy();
+                break;
+            case INT32:
+                jsonSchema = JsonSchema.INT32_SCHEMA.deepCopy();
+                break;
+            case INT64:
+                jsonSchema = JsonSchema.INT64_SCHEMA.deepCopy();
+                break;
             case STRING:
-                return JsonSchema.STRING_SCHEMA;
-            case UNION: {
-                throw new UnsupportedOperationException("union schema not supported");
-            }
+                jsonSchema = JsonSchema.STRING_SCHEMA.deepCopy();
+                break;
             case ARRAY:
-                return JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME)
-                        .set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.getElementType()));
-            case ENUM:
-                throw new UnsupportedOperationException("enum schema not supported");
+                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
+                jsonSchema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.valueSchema()));
+                break;
             case MAP:
-                throw new UnsupportedOperationException("map schema not supported");
+                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME);
+                jsonSchema.set(JsonSchema.MAP_KEY_FIELD_NAME, asJsonSchema(schema.keySchema()));
+                jsonSchema.set(JsonSchema.MAP_VALUE_FIELD_NAME, asJsonSchema(schema.valueSchema()));
+                break;
+            case STRUCT:
+                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME);
+                ArrayNode fields = JsonNodeFactory.instance.arrayNode();
+                for (Field field : schema.fields()) {
+                    ObjectNode fieldJsonSchema = asJsonSchema(field.schema());
+                    fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name());
+                    fields.add(fieldJsonSchema);
+                }
+                jsonSchema.set(JsonSchema.STRUCT_FIELDS_FIELD_NAME, fields);
+                break;
             default:
-                throw new CopycatException("Couldn't translate unsupported schema type " + schema.getType().getName() + ".");
+                throw new DataException("Couldn't translate unsupported schema type " + schema + ".");
         }
+
+        jsonSchema.put(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME, schema.isOptional());
+        if (schema.name() != null)
+            jsonSchema.put(JsonSchema.SCHEMA_NAME_FIELD_NAME, schema.name());
+        if (schema.version() != null)
+            jsonSchema.put(JsonSchema.SCHEMA_VERSION_FIELD_NAME, schema.version());
+        if (schema.doc() != null)
+            jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc());
+        if (schema.defaultValue() != null)
+            jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue()));
+
+        return jsonSchema;
     }
 
 
@@ -154,112 +272,207 @@ public class JsonConverter implements Converter<JsonNode> {
 
         JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
         if (schemaTypeNode == null || !schemaTypeNode.isTextual())
-            throw new CopycatException("Schema must contain 'type' field");
+            throw new DataException("Schema must contain 'type' field");
 
+        final SchemaBuilder builder;
         switch (schemaTypeNode.textValue()) {
             case JsonSchema.BOOLEAN_TYPE_NAME:
-                return SchemaBuilder.builder().booleanType();
-            case JsonSchema.INT_TYPE_NAME:
-                return SchemaBuilder.builder().intType();
-            case JsonSchema.LONG_TYPE_NAME:
-                return SchemaBuilder.builder().longType();
+                builder = SchemaBuilder.bool();
+                break;
+            case JsonSchema.INT8_TYPE_NAME:
+                builder = SchemaBuilder.int8();
+                break;
+            case JsonSchema.INT16_TYPE_NAME:
+                builder = SchemaBuilder.int16();
+                break;
+            case JsonSchema.INT32_TYPE_NAME:
+                builder = SchemaBuilder.int32();
+                break;
+            case JsonSchema.INT64_TYPE_NAME:
+                builder = SchemaBuilder.int64();
+                break;
             case JsonSchema.FLOAT_TYPE_NAME:
-                return SchemaBuilder.builder().floatType();
+                builder = SchemaBuilder.float32();
+                break;
             case JsonSchema.DOUBLE_TYPE_NAME:
-                return SchemaBuilder.builder().doubleType();
+                builder = SchemaBuilder.float64();
+                break;
             case JsonSchema.BYTES_TYPE_NAME:
-                return SchemaBuilder.builder().bytesType();
+                builder = SchemaBuilder.bytes();
+                break;
             case JsonSchema.STRING_TYPE_NAME:
-                return SchemaBuilder.builder().stringType();
+                builder = SchemaBuilder.string();
+                break;
             case JsonSchema.ARRAY_TYPE_NAME:
                 JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
                 if (elemSchema == null)
-                    throw new CopycatException("Array schema did not specify the element type");
-                return Schema.createArray(asCopycatSchema(elemSchema));
+                    throw new DataException("Array schema did not specify the element type");
+                builder = SchemaBuilder.array(asCopycatSchema(elemSchema));
+                break;
+            case JsonSchema.MAP_TYPE_NAME:
+                JsonNode keySchema = jsonSchema.get(JsonSchema.MAP_KEY_FIELD_NAME);
+                if (keySchema == null)
+                    throw new DataException("Map schema did not specify the key type");
+                JsonNode valueSchema = jsonSchema.get(JsonSchema.MAP_VALUE_FIELD_NAME);
+                if (valueSchema == null)
+                    throw new DataException("Map schema did not specify the value type");
+                builder = SchemaBuilder.map(asCopycatSchema(keySchema), asCopycatSchema(valueSchema));
+                break;
+            case JsonSchema.STRUCT_TYPE_NAME:
+                builder = SchemaBuilder.struct();
+                JsonNode fields = jsonSchema.get(JsonSchema.STRUCT_FIELDS_FIELD_NAME);
+                if (fields == null || !fields.isArray())
+                    throw new DataException("Struct schema's \"fields\" argument is not an array.");
+                for (JsonNode field : fields) {
+                    JsonNode jsonFieldName = field.get(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME);
+                    if (jsonFieldName == null || !jsonFieldName.isTextual())
+                        throw new DataException("Struct schema's field name not specified properly");
+                    builder.field(jsonFieldName.asText(), asCopycatSchema(field));
+                }
+                break;
             default:
-                throw new CopycatException("Unknown schema type: " + schemaTypeNode.textValue());
+                throw new DataException("Unknown schema type: " + schemaTypeNode.textValue());
+        }
+
+
+        JsonNode schemaOptionalNode = jsonSchema.get(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME);
+        if (schemaOptionalNode != null && schemaOptionalNode.isBoolean() && schemaOptionalNode.booleanValue())
+            builder.optional();
+        else
+            builder.required();
+
+        JsonNode schemaNameNode = jsonSchema.get(JsonSchema.SCHEMA_NAME_FIELD_NAME);
+        if (schemaNameNode != null && schemaNameNode.isTextual())
+            builder.name(schemaNameNode.textValue());
+
+        JsonNode schemaVersionNode = jsonSchema.get(JsonSchema.SCHEMA_VERSION_FIELD_NAME);
+        if (schemaVersionNode != null && schemaVersionNode.isIntegralNumber()) {
+            builder.version(schemaVersionNode.intValue());
         }
+
+        JsonNode schemaDocNode = jsonSchema.get(JsonSchema.SCHEMA_DOC_FIELD_NAME);
+        if (schemaDocNode != null && schemaDocNode.isTextual())
+            builder.doc(schemaDocNode.textValue());
+
+        JsonNode schemaDefaultNode = jsonSchema.get(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME);
+        if (schemaDefaultNode != null)
+            builder.defaultValue(convertToCopycat(builder, schemaDefaultNode));
+
+        return builder.build();
     }
 
 
     /**
      * Convert this object, in org.apache.kafka.copycat.data format, into a JSON object with an envelope object
      * containing schema and payload fields.
-     * @param value
-     * @return
+     * @param schema the schema for the data
+     * @param value the value
+     * @return JsonNode-encoded version
      */
-    private static JsonNode convertToJsonWithSchemaEnvelope(Object value) {
-        return convertToJson(value).toJsonNode();
+    private static JsonNode convertToJsonWithSchemaEnvelope(Schema schema, Object value) {
+        return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode();
     }
 
     /**
      * Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema
      * and the converted object.
      */
-    private static JsonSchema.Envelope convertToJson(Object value) {
+    private static JsonNode convertToJson(Schema schema, Object value) {
         if (value == null) {
-            return JsonSchema.nullEnvelope();
-        } else if (value instanceof Boolean) {
-            return JsonSchema.booleanEnvelope((Boolean) value);
-        } else if (value instanceof Byte) {
-            return JsonSchema.intEnvelope((Byte) value);
-        } else if (value instanceof Short) {
-            return JsonSchema.intEnvelope((Short) value);
-        } else if (value instanceof Integer) {
-            return JsonSchema.intEnvelope((Integer) value);
-        } else if (value instanceof Long) {
-            return JsonSchema.longEnvelope((Long) value);
-        } else if (value instanceof Float) {
-            return JsonSchema.floatEnvelope((Float) value);
-        } else if (value instanceof Double) {
-            return JsonSchema.doubleEnvelope((Double) value);
-        } else if (value instanceof byte[]) {
-            return JsonSchema.bytesEnvelope((byte[]) value);
-        } else if (value instanceof ByteBuffer) {
-            return JsonSchema.bytesEnvelope(((ByteBuffer) value).array());
-        } else if (value instanceof CharSequence) {
-            return JsonSchema.stringEnvelope(value.toString());
-        } else if (value instanceof Collection) {
-            Collection collection = (Collection) value;
-            ObjectNode schema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
-            ArrayNode list = JsonNodeFactory.instance.arrayNode();
-            JsonNode itemSchema = null;
-            for (Object elem : collection) {
-                JsonSchema.Envelope fieldSchemaAndValue = convertToJson(elem);
-                if (itemSchema == null) {
-                    itemSchema = fieldSchemaAndValue.schema;
-                    schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
-                } else {
-                    if (!itemSchema.equals(fieldSchemaAndValue.schema))
-                        throw new CopycatException("Mismatching schemas found in a list.");
+            if (schema.defaultValue() != null)
+                return convertToJson(schema, schema.defaultValue());
+            if (schema.isOptional())
+                return JsonNodeFactory.instance.nullNode();
+            throw new DataException("Conversion error: null value for field that is required and has no default value");
+        }
+
+        switch (schema.type()) {
+            case INT8:
+                return JsonNodeFactory.instance.numberNode((Byte) value);
+            case INT16:
+                return JsonNodeFactory.instance.numberNode((Short) value);
+            case INT32:
+                return JsonNodeFactory.instance.numberNode((Integer) value);
+            case INT64:
+                return JsonNodeFactory.instance.numberNode((Long) value);
+            case FLOAT32:
+                return JsonNodeFactory.instance.numberNode((Float) value);
+            case FLOAT64:
+                return JsonNodeFactory.instance.numberNode((Double) value);
+            case BOOLEAN:
+                return JsonNodeFactory.instance.booleanNode((Boolean) value);
+            case STRING:
+                CharSequence charSeq = (CharSequence) value;
+                return JsonNodeFactory.instance.textNode(charSeq.toString());
+            case BYTES:
+                if (value instanceof byte[])
+                    return JsonNodeFactory.instance.binaryNode((byte[]) value);
+                else if (value instanceof ByteBuffer)
+                    return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
+                else
+                    throw new DataException("Invalid type for bytes type: " + value.getClass());
+            case ARRAY: {
+                if (!(value instanceof Collection))
+                    throw new DataException("Invalid type for array type: " + value.getClass());
+                Collection collection = (Collection) value;
+                ArrayNode list = JsonNodeFactory.instance.arrayNode();
+                for (Object elem : collection) {
+                    JsonNode fieldValue = convertToJson(schema.valueSchema(), elem);
+                    list.add(fieldValue);
                 }
+                return list;
+            }
+            case MAP: {
+                if (!(value instanceof Map))
+                    throw new DataException("Invalid type for array type: " + value.getClass());
+                Map<?, ?> map = (Map<?, ?>) value;
+                // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
+                boolean objectMode = schema.keySchema().type() == Schema.Type.STRING;
+                ObjectNode obj = null;
+                ArrayNode list = null;
+                if (objectMode)
+                    obj = JsonNodeFactory.instance.objectNode();
+                else
+                    list = JsonNodeFactory.instance.arrayNode();
+                for (Map.Entry<?, ?> entry : map.entrySet()) {
+                    JsonNode mapKey = convertToJson(schema.keySchema(), entry.getKey());
+                    JsonNode mapValue = convertToJson(schema.valueSchema(), entry.getValue());
 
-                list.add(fieldSchemaAndValue.payload);
+                    if (objectMode)
+                        obj.set(mapKey.asText(), mapValue);
+                    else
+                        list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
+                }
+                return objectMode ? obj : list;
+            }
+            case STRUCT: {
+                if (!(value instanceof Struct))
+                    throw new DataException("Invalid type for struct type: " + value.getClass());
+                Struct struct = (Struct) value;
+                if (struct.schema() != schema)
+                    throw new DataException("Mismatching schema.");
+                ObjectNode obj = JsonNodeFactory.instance.objectNode();
+                for (Field field : schema.fields()) {
+                    obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
+                }
+                return obj;
             }
-            return new JsonSchema.Envelope(schema, list);
         }
 
-        throw new CopycatException("Couldn't convert " + value + " to Avro.");
+        throw new DataException("Couldn't convert " + value + " to JSON.");
     }
 
 
-    private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) {
-        if (jsonSchema.isNull())
-            return null;
-
-        JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
-        if (schemaTypeNode == null || !schemaTypeNode.isTextual())
-            throw new CopycatException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
-
-        JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
-        if (typeConverter != null)
-            return typeConverter.convert(jsonSchema, jsonValue);
+    private static Object convertToCopycat(Schema schema, JsonNode jsonValue) {
+        JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schema.type());
+        if (typeConverter == null)
+            throw new DataException("Unknown schema type: " + schema.type());
 
-        throw new CopycatException("Unknown schema type: " + schemaTypeNode);
+        return typeConverter.convert(schema, jsonValue);
     }
 
 
     private interface JsonToCopycatTypeConverter {
-        Object convert(JsonNode schema, JsonNode value);
+        Object convert(Schema schema, JsonNode value);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
index a807e0f..b7657be 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
@@ -21,30 +21,42 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-import java.nio.ByteBuffer;
-
 public class JsonSchema {
 
     static final String ENVELOPE_SCHEMA_FIELD_NAME = "schema";
     static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload";
     static final String SCHEMA_TYPE_FIELD_NAME = "type";
+    static final String SCHEMA_OPTIONAL_FIELD_NAME = "optional";
     static final String SCHEMA_NAME_FIELD_NAME = "name";
+    static final String SCHEMA_VERSION_FIELD_NAME = "version";
+    static final String SCHEMA_DOC_FIELD_NAME = "doc";
+    static final String SCHEMA_DEFAULT_FIELD_NAME = "default";
     static final String ARRAY_ITEMS_FIELD_NAME = "items";
+    static final String MAP_KEY_FIELD_NAME = "keys";
+    static final String MAP_VALUE_FIELD_NAME = "values";
+    static final String STRUCT_FIELDS_FIELD_NAME = "fields";
+    static final String STRUCT_FIELD_NAME_FIELD_NAME = "field";
     static final String BOOLEAN_TYPE_NAME = "boolean";
-    static final JsonNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME);
-    static final String INT_TYPE_NAME = "int";
-    static final JsonNode INT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT_TYPE_NAME);
-    static final String LONG_TYPE_NAME = "long";
-    static final JsonNode LONG_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, LONG_TYPE_NAME);
+    static final ObjectNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME);
+    static final String INT8_TYPE_NAME = "int8";
+    static final ObjectNode INT8_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT8_TYPE_NAME);
+    static final String INT16_TYPE_NAME = "int16";
+    static final ObjectNode INT16_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT16_TYPE_NAME);
+    static final String INT32_TYPE_NAME = "int32";
+    static final ObjectNode INT32_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT32_TYPE_NAME);
+    static final String INT64_TYPE_NAME = "int64";
+    static final ObjectNode INT64_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT64_TYPE_NAME);
     static final String FLOAT_TYPE_NAME = "float";
-    static final JsonNode FLOAT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, FLOAT_TYPE_NAME);
+    static final ObjectNode FLOAT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, FLOAT_TYPE_NAME);
     static final String DOUBLE_TYPE_NAME = "double";
-    static final JsonNode DOUBLE_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, DOUBLE_TYPE_NAME);
+    static final ObjectNode DOUBLE_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, DOUBLE_TYPE_NAME);
     static final String BYTES_TYPE_NAME = "bytes";
-    static final JsonNode BYTES_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BYTES_TYPE_NAME);
+    static final ObjectNode BYTES_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BYTES_TYPE_NAME);
     static final String STRING_TYPE_NAME = "string";
-    static final JsonNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME);
+    static final ObjectNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME);
     static final String ARRAY_TYPE_NAME = "array";
+    static final String MAP_TYPE_NAME = "map";
+    static final String STRUCT_TYPE_NAME = "struct";
 
     public static ObjectNode envelope(JsonNode schema, JsonNode payload) {
         ObjectNode result = JsonNodeFactory.instance.objectNode();
@@ -66,49 +78,4 @@ public class JsonSchema {
             return envelope(schema, payload);
         }
     }
-
-
-    public static Envelope nullEnvelope() {
-        return new Envelope(null, null);
-    }
-
-    public static Envelope booleanEnvelope(boolean value) {
-        return new Envelope(JsonSchema.BOOLEAN_SCHEMA, JsonNodeFactory.instance.booleanNode(value));
-    }
-
-    public static Envelope intEnvelope(byte value) {
-        return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
-    }
-
-    public static Envelope intEnvelope(short value) {
-        return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
-    }
-
-    public static Envelope intEnvelope(int value) {
-        return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
-    }
-
-    public static Envelope longEnvelope(long value) {
-        return new Envelope(JsonSchema.LONG_SCHEMA, JsonNodeFactory.instance.numberNode(value));
-    }
-
-    public static Envelope floatEnvelope(float value) {
-        return new Envelope(JsonSchema.FLOAT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
-    }
-
-    public static Envelope doubleEnvelope(double value) {
-        return new Envelope(JsonSchema.DOUBLE_SCHEMA, JsonNodeFactory.instance.numberNode(value));
-    }
-
-    public static Envelope bytesEnvelope(byte[] value) {
-        return new Envelope(JsonSchema.BYTES_SCHEMA, JsonNodeFactory.instance.binaryNode(value));
-    }
-
-    public static Envelope bytesEnvelope(ByteBuffer value) {
-        return new Envelope(JsonSchema.BYTES_SCHEMA, JsonNodeFactory.instance.binaryNode(value.array()));
-    }
-
-    public static Envelope stringEnvelope(CharSequence value) {
-        return new Envelope(JsonSchema.STRING_SCHEMA, JsonNodeFactory.instance.textNode(value.toString()));
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
index dcac270..80df6be 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
@@ -51,6 +51,9 @@ public class JsonSerializer implements Serializer<JsonNode> {
 
     @Override
     public byte[] serialize(String topic, JsonNode data) {
+        if (data == null)
+            return null;
+
         // This serializer works for Copycat data that requires a schema to be included, so we expect it to have a
         // specific format: { "schema": {...}, "payload": ... }.
         if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
index 1a725c9..ab4a86e 100644
--- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
+++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
@@ -20,12 +20,18 @@ package org.apache.kafka.copycat.json;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.data.SchemaBuilder;
+import org.apache.kafka.copycat.data.Struct;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -37,31 +43,56 @@ public class JsonConverterTest {
     ObjectMapper objectMapper = new ObjectMapper();
     JsonConverter converter = new JsonConverter();
 
+    // Schema metadata
+
+    @Test
+    public void testCopycatSchemaMetadataTranslation() {
+        // this validates the non-type fields are translated and handled properly
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
+        assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }")));
+        assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
+                converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }")));
+        assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").build(), true),
+                converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }")));
+    }
+
+    // Schema types
+
     @Test
     public void booleanToCopycat() {
-        assertEquals(true, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
-        assertEquals(false, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }")));
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }")));
+    }
+
+    @Test
+    public void byteToCopycat() {
+        assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }")));
+    }
+
+    @Test
+    public void shortToCopycat() {
+        assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }")));
     }
 
     @Test
     public void intToCopycat() {
-        assertEquals(12, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int\" }, \"payload\": 12 }")));
+        assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }")));
     }
 
     @Test
     public void longToCopycat() {
-        assertEquals(12L, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"long\" }, \"payload\": 12 }")));
-        assertEquals(4398046511104L, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"long\" }, \"payload\": 4398046511104 }")));
+        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }")));
+        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }")));
     }
 
     @Test
     public void floatToCopycat() {
-        assertEquals(12.34f, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }")));
+        assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }")));
     }
 
     @Test
     public void doubleToCopycat() {
-        assertEquals(12.34, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }")));
+        assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }")));
     }
 
 
@@ -69,89 +100,204 @@ public class JsonConverterTest {
     public void bytesToCopycat() throws UnsupportedEncodingException {
         ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
         String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
-        ByteBuffer converted = ByteBuffer.wrap((byte[]) converter.toCopycatData(parse(msg)));
+        SchemaAndValue schemaAndValue = converter.toCopycatData(parse(msg));
+        ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value());
         assertEquals(reference, converted);
     }
 
     @Test
     public void stringToCopycat() {
-        assertEquals("foo-bar-baz", converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }")));
+        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }")));
     }
 
     @Test
     public void arrayToCopycat() {
-        JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int\" } }, \"payload\": [1, 2, 3] }");
-        assertEquals(Arrays.asList(1, 2, 3), converter.toCopycatData(arrayJson));
+        JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }");
+        assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(arrayJson));
+    }
+
+    @Test
+    public void mapToCopycatStringKeys() {
+        JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }");
+        Map<String, Integer> expected = new HashMap<>();
+        expected.put("key1", 12);
+        expected.put("key2", 15);
+        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson));
+    }
+
+    @Test
+    public void mapToCopycatNonStringKeys() {
+        JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }");
+        Map<Integer, Integer> expected = new HashMap<>();
+        expected.put(1, 12);
+        expected.put(2, 15);
+        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson));
+    }
+
+    @Test
+    public void structToCopycat() {
+        JsonNode structJson = parse("{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }");
+        Schema expectedSchema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
+        Struct expected = new Struct(expectedSchema).put("field1", true).put("field2", "string");
+        SchemaAndValue converted = converter.toCopycatData(structJson);
+        assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
+    }
+
+    // Schema metadata
+
+    @Test
+    public void testJsonSchemaMetadataTranslation() {
+        JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+
+        converted = converter.fromCopycatData(Schema.OPTIONAL_BOOLEAN_SCHEMA, null);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNull());
+
+        converted = converter.fromCopycatData(SchemaBuilder.bool().defaultValue(true).build(), true);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+
+        converted = converter.fromCopycatData(SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\"}"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
     }
 
+    // Schema types
 
     @Test
     public void booleanToJson() {
-        JsonNode converted = converter.fromCopycatData(true);
+        JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true);
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"boolean\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
     }
 
     @Test
+    public void byteToJson() {
+        JsonNode converted = converter.fromCopycatData(Schema.INT8_SCHEMA, (byte) 12);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int8\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
+    }
+
+    @Test
+    public void shortToJson() {
+        JsonNode converted = converter.fromCopycatData(Schema.INT16_SCHEMA, (short) 12);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int16\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
+    }
+
+    @Test
     public void intToJson() {
-        JsonNode converted = converter.fromCopycatData(12);
+        JsonNode converted = converter.fromCopycatData(Schema.INT32_SCHEMA, 12);
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"int\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
     }
 
     @Test
     public void longToJson() {
-        JsonNode converted = converter.fromCopycatData(4398046511104L);
+        JsonNode converted = converter.fromCopycatData(Schema.INT64_SCHEMA, 4398046511104L);
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"long\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(parse("{ \"type\": \"int64\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue());
     }
 
     @Test
     public void floatToJson() {
-        JsonNode converted = converter.fromCopycatData(12.34f);
+        JsonNode converted = converter.fromCopycatData(Schema.FLOAT32_SCHEMA, 12.34f);
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"float\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001);
     }
 
     @Test
     public void doubleToJson() {
-        JsonNode converted = converter.fromCopycatData(12.34);
+        JsonNode converted = converter.fromCopycatData(Schema.FLOAT64_SCHEMA, 12.34);
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"double\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001);
     }
 
     @Test
     public void bytesToJson() throws IOException {
-        JsonNode converted = converter.fromCopycatData("test-string".getBytes());
+        JsonNode converted = converter.fromCopycatData(Schema.BYTES_SCHEMA, "test-string".getBytes());
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"bytes\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(ByteBuffer.wrap("test-string".getBytes()),
                 ByteBuffer.wrap(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue()));
     }
 
     @Test
     public void stringToJson() {
-        JsonNode converted = converter.fromCopycatData("test-string");
+        JsonNode converted = converter.fromCopycatData(Schema.STRING_SCHEMA, "test-string");
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"string\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
     }
 
     @Test
     public void arrayToJson() {
-        JsonNode converted = converter.fromCopycatData(Arrays.asList(1, 2, 3));
+        Schema int32Array = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
+        JsonNode converted = converter.fromCopycatData(int32Array, Arrays.asList(1, 2, 3));
         validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int\" } }"),
+        assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int32\", \"optional\": false }, \"optional\": false }"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add(2).add(3),
                 converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
     }
 
+    @Test
+    public void mapToJsonStringKeys() {
+        Schema stringIntMap = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build();
+        Map<String, Integer> input = new HashMap<>();
+        input.put("key1", 12);
+        input.put("key2", 15);
+        JsonNode converted = converter.fromCopycatData(stringIntMap, input);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", 15),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    @Test
+    public void mapToJsonNonStringKeys() {
+        Schema intIntMap = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build();
+        Map<Integer, Integer> input = new HashMap<>();
+        input.put(1, 12);
+        input.put(2, 15);
+        JsonNode converted = converter.fromCopycatData(intIntMap, input);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"int32\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(JsonNodeFactory.instance.arrayNode()
+                        .add(JsonNodeFactory.instance.arrayNode().add(1).add(12))
+                        .add(JsonNodeFactory.instance.arrayNode().add(2).add(15)),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    @Test
+    public void structToJson() {
+        Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
+        Struct input = new Struct(schema).put("field1", true).put("field2", "string");
+        JsonNode converted = converter.fromCopycatData(schema, input);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(JsonNodeFactory.instance.objectNode()
+                        .put("field1", true)
+                        .put("field2", "string"),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
 
     private JsonNode parse(String json) {
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
index 46229db..23cdf4d 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
 import java.util.Properties;
-import java.util.Set;
 
 /**
  * Configuration for standalone workers.
@@ -129,13 +128,4 @@ public class WorkerConfig extends AbstractConfig {
     public WorkerConfig(Properties props) {
         super(config, props);
     }
-
-    public Properties getUnusedProperties() {
-        Set<String> unusedKeys = this.unused();
-        Properties unusedProps = new Properties();
-        for (String key : unusedKeys) {
-            unusedProps.put(key, this.originals().get(key));
-        }
-        return unusedProps;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
index e3fcc1c..336597e 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
 import java.util.Properties;
-import java.util.Set;
 
 /**
  * <p>
@@ -64,24 +63,11 @@ public class ConnectorConfig extends AbstractConfig {
                 .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
     }
 
-    private Properties originalProperties;
-
     public ConnectorConfig() {
         this(new Properties());
     }
 
     public ConnectorConfig(Properties props) {
         super(config, props);
-        this.originalProperties = props;
-    }
-
-    public Properties getUnusedProperties() {
-        Set<String> unusedKeys = this.unused();
-        Properties unusedProps = new Properties();
-        for (String key : unusedKeys) {
-            unusedProps.setProperty(key, originalProperties.getProperty(key));
-        }
-        return unusedProps;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 55d0784..704470a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -115,7 +115,7 @@ public class Worker<K, V> {
     public void start() {
         log.info("Worker starting");
 
-        Properties unusedConfigs = config.getUnusedProperties();
+        Properties unusedConfigs = config.unusedProperties();
 
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
@@ -188,9 +188,9 @@ public class Worker<K, V> {
         final WorkerTask workerTask;
         if (task instanceof SourceTask) {
             SourceTask sourceTask = (SourceTask) task;
-            OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.getConnector(),
+            OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.connector(),
                     keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
-            OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.getConnector(),
+            OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.connector(),
                     keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
             workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
                     offsetReader, offsetWriter, config, time);

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 272f62a..a1d7fab 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.apache.kafka.copycat.sink.SinkTask;
@@ -144,11 +145,11 @@ class WorkerSinkTask<K, V> implements WorkerTask {
         consumer.commit(offsets, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
     }
 
-    public Time getTime() {
+    public Time time() {
         return time;
     }
 
-    public WorkerConfig getWorkerConfig() {
+    public WorkerConfig workerConfig() {
         return workerConfig;
     }
 
@@ -160,7 +161,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
 
         // Include any unknown worker configs so consumer configs can be set globally on the worker
         // and through to the task
-        Properties props = workerConfig.getUnusedProperties();
+        Properties props = workerConfig.unusedProperties();
         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.toString());
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                 Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
@@ -187,7 +188,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
         // To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee.
         // We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
         newConsumer.poll(0);
-        Map<TopicPartition, Long> offsets = context.getOffsets();
+        Map<TopicPartition, Long> offsets = context.offsets();
         for (TopicPartition tp : newConsumer.assignment()) {
             Long offset = offsets.get(tp);
             if (offset != null)
@@ -206,10 +207,12 @@ class WorkerSinkTask<K, V> implements WorkerTask {
             List<SinkRecord> records = new ArrayList<>();
             for (ConsumerRecord<K, V> msg : msgs) {
                 log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
+                SchemaAndValue keyAndSchema = msg.key() != null ? keyConverter.toCopycatData(msg.key()) : SchemaAndValue.NULL;
+                SchemaAndValue valueAndSchema = msg.value() != null ? valueConverter.toCopycatData(msg.value()) : SchemaAndValue.NULL;
                 records.add(
                         new SinkRecord(msg.topic(), msg.partition(),
-                                keyConverter.toCopycatData(msg.key()),
-                                valueConverter.toCopycatData(msg.value()),
+                                keyAndSchema.schema(), keyAndSchema.value(),
+                                valueAndSchema.schema(), valueAndSchema.value(),
                                 msg.offset())
                 );
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
index b946343..41e38f0 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
@@ -55,11 +55,11 @@ class WorkerSinkTaskThread extends ShutdownableThread {
             iteration();
         }
         // Make sure any uncommitted data has committed
-        task.commitOffsets(task.getTime().milliseconds(), true, -1, false);
+        task.commitOffsets(task.time().milliseconds(), true, -1, false);
     }
 
     public void iteration() {
-        long now = task.getTime().milliseconds();
+        long now = task.time().milliseconds();
 
         // Maybe commit
         if (!committing && now >= nextCommit) {
@@ -69,11 +69,11 @@ class WorkerSinkTaskThread extends ShutdownableThread {
                 commitStarted = now;
             }
             task.commitOffsets(now, false, commitSeqno, true);
-            nextCommit += task.getWorkerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+            nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
         }
 
         // Check for timed out commits
-        long commitTimeout = commitStarted + task.getWorkerConfig().getLong(
+        long commitTimeout = commitStarted + task.workerConfig().getLong(
                 WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
         if (committing && now >= commitTimeout) {
             log.warn("Commit of {} offsets timed out", this);
@@ -98,7 +98,7 @@ class WorkerSinkTaskThread extends ShutdownableThread {
                     commitFailures++;
                 } else {
                     log.debug("Finished {} offset commit successfully in {} ms",
-                            this, task.getTime().milliseconds() - commitStarted);
+                            this, task.time().milliseconds() - commitStarted);
                     commitFailures = 0;
                 }
                 committing = false;
@@ -106,7 +106,7 @@ class WorkerSinkTaskThread extends ShutdownableThread {
         }
     }
 
-    public int getCommitFailures() {
+    public int commitFailures() {
         return commitFailures;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index c80adb4..14b9c3a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -131,11 +132,11 @@ class WorkerSourceTask<K, V> implements WorkerTask {
      */
     private synchronized void sendRecords(List<SourceRecord> records) {
         for (SourceRecord record : records) {
-            final ProducerRecord<K, V> producerRecord
-                    = new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
-                    keyConverter.fromCopycatData(record.getKey()),
-                    valueConverter.fromCopycatData(record.getValue()));
-            log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
+            K key = (record.keySchema() != null) ? keyConverter.fromCopycatData(record.keySchema(), record.key()) : null;
+            V value = (record.valueSchema() != null) ? valueConverter.fromCopycatData(record.valueSchema(), record.value()) : null;
+            final ProducerRecord<K, V> producerRecord = new ProducerRecord<>(
+                    record.topic(), record.kafkaPartition(), key, value);
+            log.trace("Appending record with key {}, value {}", record.key(), record.value());
             if (!flushing) {
                 outstandingMessages.put(producerRecord, producerRecord);
             } else {
@@ -157,7 +158,8 @@ class WorkerSourceTask<K, V> implements WorkerTask {
                         }
                     });
             // Offsets are converted & serialized in the OffsetWriter
-            offsetWriter.setOffset(record.getSourcePartition(), record.getSourceOffset());
+            offsetWriter.offset(new SchemaAndValue(record.sourcePartitionSchema(), record.sourcePartition()),
+                    new SchemaAndValue(record.sourceOffsetSchema(), record.sourceOffset()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
index 2ed9183..45d428d 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -99,7 +99,7 @@ public class StandaloneHerder implements Herder {
         log.info("Creating connector {} of type {}", connName, className);
         int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
         List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
-        Properties configs = connConfig.getUnusedProperties();
+        Properties configs = connConfig.unusedProperties();
 
         if (connectors.containsKey(connName)) {
             log.error("Ignoring request to create connector due to conflicting connector name");
@@ -161,11 +161,11 @@ public class StandaloneHerder implements Herder {
     }
 
     private void createConnectorTasks(ConnectorState state) {
-        String taskClassName = state.connector.getTaskClass().getName();
+        String taskClassName = state.connector.taskClass().getName();
 
         log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
 
-        List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks);
+        List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks);
 
         // Generate the final configs, including framework provided settings
         Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();


Mime
View raw message