kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [3/3] kafka git commit: KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter.
Date Mon, 31 Aug 2015 19:26:27 GMT
KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter.

The Converter class now translates directly between byte[] and Copycat's data
API instead of requiring an intermediate runtime type like Avro's GenericRecord
or Jackson's JsonNode.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Gwen Shapira

Closes #172 from ewencp/kafka-2475-unified-serializer-converter and squashes the following commits:

566c52f [Ewen Cheslack-Postava] Checkstyle fixes
320d0df [Ewen Cheslack-Postava] Restrict offset format.
85797e7 [Ewen Cheslack-Postava] Add StringConverter for using Copycat with raw strings.
698d65c [Ewen Cheslack-Postava] Move and update outdated comment about handing of types for BYTES type in Copycat.
4bed051 [Ewen Cheslack-Postava] KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3803e5cb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3803e5cb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3803e5cb

Branch: refs/heads/trunk
Commit: 3803e5cb37cb602ff9eab5562ff8db3a2dd79b45
Parents: 9c936b1
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Mon Aug 31 12:26:16 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon Aug 31 12:26:16 2015 -0700

----------------------------------------------------------------------
 .../kafka/common/config/AbstractConfig.java     |  15 +
 .../kafka/common/config/AbstractConfigTest.java |  13 +
 config/copycat-standalone.properties            |  17 +-
 .../kafka/copycat/data/CopycatSchema.java       |  92 ++++--
 .../org/apache/kafka/copycat/data/Schema.java   |   1 -
 .../kafka/copycat/data/SchemaBuilder.java       |   3 +-
 .../kafka/copycat/source/SourceRecord.java      |  41 +--
 .../apache/kafka/copycat/storage/Converter.java |  23 +-
 .../copycat/storage/OffsetStorageReader.java    |  13 +-
 .../kafka/copycat/storage/StringConverter.java  |  81 ++++++
 .../copycat/storage/StringConverterTest.java    |  83 ++++++
 .../copycat/file/FileStreamSourceTask.java      |  33 ++-
 .../copycat/file/FileStreamSourceTaskTest.java  |  23 +-
 .../kafka/copycat/json/JsonConverter.java       | 281 +++++++++++++------
 .../kafka/copycat/json/JsonDeserializer.java    |  31 +-
 .../kafka/copycat/json/JsonSerializer.java      |  15 -
 .../kafka/copycat/json/JsonConverterTest.java   | 243 ++++++++++++----
 .../apache/kafka/copycat/cli/WorkerConfig.java  |  35 +--
 .../apache/kafka/copycat/runtime/Worker.java    |  72 ++---
 .../kafka/copycat/runtime/WorkerSinkTask.java   |  30 +-
 .../kafka/copycat/runtime/WorkerSourceTask.java |  39 ++-
 .../storage/OffsetStorageReaderImpl.java        |  39 ++-
 .../copycat/storage/OffsetStorageWriter.java    |  37 ++-
 .../kafka/copycat/storage/OffsetUtils.java      |  46 +++
 .../copycat/runtime/WorkerSinkTaskTest.java     |  26 +-
 .../copycat/runtime/WorkerSourceTaskTest.java   |  60 ++--
 .../kafka/copycat/runtime/WorkerTest.java       |  18 +-
 .../storage/OffsetStorageWriterTest.java        |  42 +--
 tests/kafkatest/tests/copycat_test.py           |  33 ++-
 .../templates/copycat-file-sink.properties      |   2 +-
 .../templates/copycat-file-source.properties    |   2 +-
 .../templates/copycat-standalone.properties     |  19 +-
 32 files changed, 991 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 774701a..12a1927 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -115,6 +115,21 @@ public class AbstractConfig {
         return copy;
     }
 
+    /**
+     * Gets all original settings with the given prefix, stripping the prefix before adding it to the output.
+     *
+     * @param prefix the prefix to use as a filter
+     * @return a Map containing the settings with the prefix
+     */
+    public Map<String, Object> originalsWithPrefix(String prefix) {
+        Map<String, Object> result = new HashMap<String, Object>();
+        for (Map.Entry<String, ?> entry : originals.entrySet()) {
+            if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length())
+                result.put(entry.getKey().substring(prefix.length()), entry.getValue());
+        }
+        return result;
+    }
+
     public Map<String, ?> values() {
         return new HashMap<String, Object>(values);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index db1b0ee..28064ec 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -17,10 +17,12 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
 
 public class AbstractConfigTest {
 
@@ -35,6 +37,17 @@ public class AbstractConfigTest {
         testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
     }
 
+    @Test
+    public void testOriginalsWithPrefix() {
+        Properties props = new Properties();
+        props.put("foo.bar", "abc");
+        props.put("setting", "def");
+        TestConfig config = new TestConfig(props);
+        Map<String, Object> expected = new HashMap<>();
+        expected.put("bar", "abc");
+        assertEquals(expected, config.originalsWithPrefix("foo."));
+    }
+
     private void testValidInputs(String configValue) {
         Properties props = new Properties();
         props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/config/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/config/copycat-standalone.properties b/config/copycat-standalone.properties
index cf3b268..fd264b5 100644
--- a/config/copycat-standalone.properties
+++ b/config/copycat-standalone.properties
@@ -16,12 +16,21 @@
 # These are defaults. This file just demonstrates how to override some settings.
 bootstrap.servers=localhost:9092
 
+# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will
+# need to configure these based on the format they want their data in when loaded from or stored into Kafka
 key.converter=org.apache.kafka.copycat.json.JsonConverter
 value.converter=org.apache.kafka.copycat.json.JsonConverter
-key.serializer=org.apache.kafka.copycat.json.JsonSerializer
-value.serializer=org.apache.kafka.copycat.json.JsonSerializer
-key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
-value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
+# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
+# it to
+key.converter.schemas.enable=true
+value.converter.schemas.enable=true
+
+# The offset converter is configurable and must be specified, but most users will always want to use the built-in default.
+# Offset data is never visible outside of Copcyat.
+offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.key.converter.schemas.enable=false
+offset.value.converter.schemas.enable=false
 
 offset.storage.file.filename=/tmp/copycat.offsets
 # Flush much faster than normal, which is useful for testing/debugging

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
index c823f28..809496a 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
@@ -23,20 +23,37 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 public class CopycatSchema implements Schema {
-    private static final Map<Type, Class<?>> SCHEMA_TYPE_CLASSES = new HashMap<>();
+    /**
+     * Maps Schema.Types to a list of Java classes that can be used to represent them.
+     */
+    private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>();
+
+    /**
+     * Maps the Java classes to the corresponding Schema.Type.
+     */
+    private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();
+
     static {
-        SCHEMA_TYPE_CLASSES.put(Type.INT8, Byte.class);
-        SCHEMA_TYPE_CLASSES.put(Type.INT16, Short.class);
-        SCHEMA_TYPE_CLASSES.put(Type.INT32, Integer.class);
-        SCHEMA_TYPE_CLASSES.put(Type.INT64, Long.class);
-        SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Float.class);
-        SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Double.class);
-        SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Boolean.class);
-        SCHEMA_TYPE_CLASSES.put(Type.STRING, String.class);
-        SCHEMA_TYPE_CLASSES.put(Type.ARRAY, List.class);
-        SCHEMA_TYPE_CLASSES.put(Type.MAP, Map.class);
-        SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Struct.class);
-        // Bytes are handled as a special case
+        SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class));
+        SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) Short.class));
+        SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) Integer.class));
+        SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class));
+        SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) Float.class));
+        SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) Double.class));
+        SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) Boolean.class));
+        SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) String.class));
+        // Bytes are special and have 2 representations. byte[] causes problems because it doesn't handle equals() and
+        // hashCode() like we want objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause
+        // those methods to fail, so ByteBuffers are recommended
+        SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class));
+        SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class));
+        SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class));
+        SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) Struct.class));
+
+        for (Map.Entry<Type, List<Class>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
+            for (Class<?> schemaClass : schemaClasses.getValue())
+                JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
+        }
     }
 
     // The type of the field
@@ -158,14 +175,19 @@ public class CopycatSchema implements Schema {
                 return;
         }
 
-        // Special case for bytes. byte[] causes problems because it doesn't handle equals()/hashCode() like we want
-        // objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause those methods to fail, so
-        // ByteBuffers are recommended
-        if (schema.type() == Type.BYTES && (value instanceof byte[] || value instanceof ByteBuffer))
-            return;
-        Class<?> expectedClass = SCHEMA_TYPE_CLASSES.get(schema.type());
-        if (expectedClass == null || !expectedClass.isInstance(value))
-                throw new DataException("Invalid value: expected " + expectedClass + " for type " + schema.type() + " but tried to use " + value.getClass());
+        final List<Class> expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+        if (expectedClasses == null)
+            throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());
+
+        boolean foundMatch = false;
+        for (Class<?> expectedClass : expectedClasses) {
+            if (expectedClass.isInstance(value)) {
+                foundMatch = true;
+                break;
+            }
+        }
+        if (!foundMatch)
+            throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());
 
         switch (schema.type()) {
             case STRUCT:
@@ -232,4 +254,32 @@ public class CopycatSchema implements Schema {
         else
             return "Schema{" + type + "}";
     }
+
+
+    /**
+     * Get the {@link Type} associated with the the given class.
+     *
+     * @param klass the Class to
+     * @return the corresponding type, nor null if there is no matching type
+     */
+    public static Type schemaType(Class<?> klass) {
+        synchronized (JAVA_CLASS_SCHEMA_TYPES) {
+            Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass);
+            if (schemaType != null)
+                return schemaType;
+
+            // Since the lookup only checks the class, we need to also try
+            for (Map.Entry<Class<?>, Type> entry : JAVA_CLASS_SCHEMA_TYPES.entrySet()) {
+                try {
+                    klass.asSubclass(entry.getKey());
+                    // Cache this for subsequent lookups
+                    JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue());
+                    return entry.getValue();
+                } catch (ClassCastException e) {
+                    // Expected, ignore
+                }
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
index 5ceb57d..4ece21d 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
@@ -92,7 +92,6 @@ public interface Schema {
     Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build();
     Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
 
-
     /**
      * @return the type of this schema
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
index fe9d474..d9c149d 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.copycat.errors.DataException;
 import org.apache.kafka.copycat.errors.SchemaBuilderException;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -346,7 +347,7 @@ public class SchemaBuilder implements Schema {
      * @return the {@link Schema}
      */
     public Schema build() {
-        return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields, keySchema, valueSchema);
+        return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
index 05286a1..7f54c10 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.copycat.connector.CopycatRecord;
 import org.apache.kafka.copycat.data.Schema;
 
+import java.util.Map;
+
 /**
  * <p>
  * SourceRecords are generated by SourceTasks and passed to Copycat for storage in
@@ -41,47 +43,32 @@ import org.apache.kafka.copycat.data.Schema;
  */
 @InterfaceStability.Unstable
 public class SourceRecord extends CopycatRecord {
-    private final Schema sourcePartitionSchema;
-    private final Object sourcePartition;
-    private final Schema sourceOffsetSchema;
-    private final Object sourceOffset;
+    private final Map<String, ?> sourcePartition;
+    private final Map<String, ?> sourceOffset;
 
-    public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
-                        Schema sourceOffsetSchema, Object sourceOffset,
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                         String topic, Integer partition, Schema valueSchema, Object value) {
-        this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, partition, null, null, valueSchema, value);
+        this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value);
     }
 
-    public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
-                        Schema sourceOffsetSchema, Object sourceOffset,
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                         String topic, Schema valueSchema, Object value) {
-        this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, null, null, null, valueSchema, value);
+        this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value);
     }
 
-    public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
-                        Schema sourceOffsetSchema, Object sourceOffset,
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                         String topic, Integer partition,
                         Schema keySchema, Object key, Schema valueSchema, Object value) {
         super(topic, partition, keySchema, key, valueSchema, value);
-        this.sourcePartitionSchema = sourcePartitionSchema;
         this.sourcePartition = sourcePartition;
-        this.sourceOffsetSchema = sourceOffsetSchema;
         this.sourceOffset = sourceOffset;
     }
 
-    public Schema sourcePartitionSchema() {
-        return sourcePartitionSchema;
-    }
-
-    public Object sourcePartition() {
+    public Map<String, ?> sourcePartition() {
         return sourcePartition;
     }
 
-    public Schema sourceOffsetSchema() {
-        return sourceOffsetSchema;
-    }
-
-    public Object sourceOffset() {
+    public Map<String, ?> sourceOffset() {
         return sourceOffset;
     }
 
@@ -96,10 +83,6 @@ public class SourceRecord extends CopycatRecord {
 
         SourceRecord that = (SourceRecord) o;
 
-        if (sourcePartitionSchema != null ? !sourcePartitionSchema.equals(that.sourcePartitionSchema) : that.sourcePartitionSchema != null)
-            return false;
-        if (sourceOffsetSchema != null ? !sourceOffsetSchema.equals(that.sourceOffsetSchema) : that.sourceOffsetSchema != null)
-            return false;
         if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
             return false;
         if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
@@ -111,8 +94,6 @@ public class SourceRecord extends CopycatRecord {
     @Override
     public int hashCode() {
         int result = super.hashCode();
-        result = 31 * result + (sourcePartitionSchema != null ? sourcePartitionSchema.hashCode() : 0);
-        result = 31 * result + (sourceOffsetSchema != null ? sourceOffsetSchema.hashCode() : 0);
         result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
         result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
         return result;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
index dd2068d..d51b789 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
@@ -21,28 +21,37 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 
+import java.util.Map;
+
 /**
  * The Converter interface provides support for translating between Copycat's runtime data format
- * and the "native" runtime format used by the serialization layer. This is used to translate
- * two types of data: records and offsets. The (de)serialization is performed by a separate
- * component -- the producer or consumer serializer or deserializer for records or a Copycat
- * serializer or deserializer for offsets.
+ * and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization
+ * layer (e.g. JsonNode, GenericRecord, Message).
  */
 @InterfaceStability.Unstable
-public interface Converter<T> {
+public interface Converter {
+
+    /**
+     * Configure this class.
+     * @param configs configs in key/value pairs
+     * @param isKey whether is for key or value
+     */
+    void configure(Map<String, ?> configs, boolean isKey);
 
     /**
      * Convert a Copycat data object to a native object for serialization.
+     * @param topic the topic associated with the data
      * @param schema the schema for the value
      * @param value the value to convert
      * @return
      */
-    T fromCopycatData(Schema schema, Object value);
+    byte[] fromCopycatData(String topic, Schema schema, Object value);
 
     /**
      * Convert a native object to a Copycat data object.
+     * @param topic the topic associated with the data
      * @param value the value to convert
      * @return an object containing the {@link Schema} and the converted value
      */
-    SchemaAndValue toCopycatData(T value);
+    SchemaAndValue toCopycatData(String topic, byte[] value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
index b51fbde..95d2c04 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
@@ -18,15 +18,20 @@
 package org.apache.kafka.copycat.storage;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.data.SchemaAndValue;
 
 import java.util.Collection;
 import java.util.Map;
 
 /**
+ * <p>
  * OffsetStorageReader provides access to the offset storage used by sources. This can be used by
  * connectors to determine offsets to start consuming data from. This is most commonly used during
  * initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task.
+ * </p>
+ * <p>
+ * Offsets are always defined as Maps of Strings to primitive types, i.e. all types supported by
+ * {@link org.apache.kafka.copycat.data.Schema} other than Array, Map, and Struct.
+ * </p>
  */
 @InterfaceStability.Unstable
 public interface OffsetStorageReader {
@@ -37,12 +42,12 @@ public interface OffsetStorageReader {
      * @param partition object uniquely identifying the partition of data
      * @return object uniquely identifying the offset in the partition of data
      */
-    SchemaAndValue offset(SchemaAndValue partition);
+    <T> Map<String, Object> offset(Map<String, T> partition);
 
     /**
      * <p>
      * Get a set of offsets for the specified partition identifiers. This may be more efficient
-     * than calling {@link #offset(SchemaAndValue)} repeatedly.
+     * than calling {@link #offset(Map)} repeatedly.
      * </p>
      * <p>
      * Note that when errors occur, this method omits the associated data and tries to return as
@@ -56,5 +61,5 @@ public interface OffsetStorageReader {
      * @param partitions set of identifiers for partitions of data
      * @return a map of partition identifiers to decoded offsets
      */
-    Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue> partitions);
+    <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
new file mode 100644
index 0000000..8d708f8
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link Converter} implementation that only supports serializing to strings. When converting Copycat data to bytes,
+ * the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String.
+ * When converting from bytes to Copycat format, the converter will only ever return an optional string schema and
+ * a string or null.
+ *
+ * Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience
+ * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding
+ * setting.
+ */
+public class StringConverter implements Converter {
+    private final StringSerializer serializer = new StringSerializer();
+    private final StringDeserializer deserializer = new StringDeserializer();
+
+    public StringConverter() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        Map<String, Object> serializerConfigs = new HashMap<>();
+        serializerConfigs.putAll(configs);
+        Map<String, Object> deserializerConfigs = new HashMap<>();
+        deserializerConfigs.putAll(configs);
+
+        Object encodingValue = configs.get("converter.encoding");
+        if (encodingValue != null) {
+            serializerConfigs.put("serializer.encoding", encodingValue);
+            deserializerConfigs.put("deserializer.encoding", encodingValue);
+        }
+
+        serializer.configure(serializerConfigs, isKey);
+        deserializer.configure(deserializerConfigs, isKey);
+    }
+
+    @Override
+    public byte[] fromCopycatData(String topic, Schema schema, Object value) {
+        try {
+            return serializer.serialize(topic, value == null ? null : value.toString());
+        } catch (SerializationException e) {
+            throw new DataException("Failed to serialize to a string: ", e);
+        }
+    }
+
+    @Override
+    public SchemaAndValue toCopycatData(String topic, byte[] value) {
+        try {
+            return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value));
+        } catch (SerializationException e) {
+            throw new DataException("Failed to deserialize string: ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
new file mode 100644
index 0000000..3ea69c1
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+
+public class StringConverterTest {
+    private static final String TOPIC = "topic";
+    private static final String SAMPLE_STRING = "a string";
+
+    private StringConverter converter = new StringConverter();
+
+    @Test
+    public void testStringToBytes() throws UnsupportedEncodingException {
+        assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
+    }
+
+    @Test
+    public void testNonStringToBytes() throws UnsupportedEncodingException {
+        assertArrayEquals("true".getBytes("UTF8"), converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
+    }
+
+    @Test
+    public void testNullToBytes() {
+        assertEquals(null, converter.fromCopycatData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, null));
+    }
+
+    @Test
+    public void testToBytesIgnoresSchema() throws UnsupportedEncodingException {
+        assertArrayEquals("true".getBytes("UTF8"), converter.fromCopycatData(TOPIC, null, true));
+    }
+
+    @Test
+    public void testToBytesNonUtf8Encoding() throws UnsupportedEncodingException {
+        converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
+        assertArrayEquals(SAMPLE_STRING.getBytes("UTF-16"), converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
+    }
+
+    @Test
+    public void testBytesToString() {
+        SchemaAndValue data = converter.toCopycatData(TOPIC, SAMPLE_STRING.getBytes());
+        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+        assertEquals(SAMPLE_STRING, data.value());
+    }
+
+    @Test
+    public void testBytesNullToString() {
+        SchemaAndValue data = converter.toCopycatData(TOPIC, null);
+        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+        assertEquals(null, data.value());
+    }
+
+    @Test
+    public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException {
+        converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
+        SchemaAndValue data = converter.toCopycatData(TOPIC, SAMPLE_STRING.getBytes("UTF-16"));
+        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+        assertEquals(SAMPLE_STRING, data.value());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index a841386..91292e9 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.copycat.file;
 
 import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
@@ -26,17 +25,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 
 /**
  * FileStreamSourceTask reads from stdin or a file.
  */
 public class FileStreamSourceTask extends SourceTask {
     private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
-    private static final Schema OFFSET_KEY_SCHEMA = Schema.STRING_SCHEMA;
-    private static final Schema OFFSET_VALUE_SCHEMA = Schema.OPTIONAL_INT64_SCHEMA;
+    public static final String FILENAME_FIELD = "filename";
+    public  static final String POSITION_FIELD = "position";
     private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
 
     private String filename;
@@ -66,14 +63,14 @@ public class FileStreamSourceTask extends SourceTask {
         if (stream == null) {
             try {
                 stream = new FileInputStream(filename);
-                SchemaAndValue offsetWithSchema = context.offsetStorageReader().offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, filename));
-                if (offsetWithSchema != null) {
-                    if (!offsetWithSchema.schema().equals(OFFSET_VALUE_SCHEMA))
-                        throw new CopycatException("Unexpected offset schema.");
-                    Long lastRecordedOffset = (Long) offsetWithSchema.value();
+                Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
+                if (offset != null) {
+                    Object lastRecordedOffset = offset.get(POSITION_FIELD);
+                    if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
+                        throw new CopycatException("Offset position is the incorrect type");
                     if (lastRecordedOffset != null) {
                         log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
-                        long skipLeft = lastRecordedOffset;
+                        long skipLeft = (Long) lastRecordedOffset;
                         while (skipLeft > 0) {
                             try {
                                 long skipped = stream.skip(skipLeft);
@@ -85,7 +82,7 @@ public class FileStreamSourceTask extends SourceTask {
                         }
                         log.debug("Skipped to offset {}", lastRecordedOffset);
                     }
-                    streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L;
+                    streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
                 } else {
                     streamOffset = 0L;
                 }
@@ -130,7 +127,7 @@ public class FileStreamSourceTask extends SourceTask {
                         if (line != null) {
                             if (records == null)
                                 records = new ArrayList<>();
-                            records.add(new SourceRecord(OFFSET_KEY_SCHEMA, filename, OFFSET_VALUE_SCHEMA, streamOffset, topic, VALUE_SCHEMA, line));
+                            records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
                         }
                         new ArrayList<SourceRecord>();
                     } while (line != null);
@@ -193,4 +190,12 @@ public class FileStreamSourceTask extends SourceTask {
             this.notify();
         }
     }
+
+    private Map<String, String> offsetKey(String filename) {
+        return Collections.singletonMap(FILENAME_FIELD, filename);
+    }
+
+    private Map<String, Long> offsetValue(Long pos) {
+        return Collections.singletonMap(POSITION_FIELD, pos);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
index ab89b6a..d2781c9 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.copycat.file;
 
-import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -31,7 +30,9 @@ import org.powermock.api.easymock.PowerMock;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -89,7 +90,8 @@ public class FileStreamSourceTaskTest {
         assertEquals(1, records.size());
         assertEquals(TOPIC, records.get(0).topic());
         assertEquals("partial line finished", records.get(0).value());
-        assertEquals(22L, records.get(0).sourceOffset());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset());
         assertEquals(null, task.poll());
 
         // Different line endings, and make sure the final \r doesn't result in a line until we can
@@ -99,20 +101,25 @@ public class FileStreamSourceTaskTest {
         records = task.poll();
         assertEquals(4, records.size());
         assertEquals("line1", records.get(0).value());
-        assertEquals(28L, records.get(0).sourceOffset());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset());
         assertEquals("line2", records.get(1).value());
-        assertEquals(35L, records.get(1).sourceOffset());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset());
         assertEquals("line3", records.get(2).value());
-        assertEquals(41L, records.get(2).sourceOffset());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset());
         assertEquals("line4", records.get(3).value());
-        assertEquals(47L, records.get(3).sourceOffset());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset());
 
         os.write("subsequent text".getBytes());
         os.flush();
         records = task.poll();
         assertEquals(1, records.size());
         assertEquals("", records.get(0).value());
-        assertEquals(48L, records.get(0).sourceOffset());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset());
 
         task.stop();
     }
@@ -135,6 +142,6 @@ public class FileStreamSourceTaskTest {
 
 
     private void expectOffsetLookupReturnNone() {
-        EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(SchemaAndValue.class))).andReturn(null);
+        EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
index 67df11d..1841640 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.copycat.data.*;
 import org.apache.kafka.copycat.errors.DataException;
 import org.apache.kafka.copycat.storage.Converter;
@@ -32,7 +33,9 @@ import java.util.*;
 /**
  * Implementation of Converter that uses JSON to store schemas and objects.
  */
-public class JsonConverter implements Converter<JsonNode> {
+public class JsonConverter implements Converter {
+    private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
+    private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
 
     private static final HashMap<Schema.Type, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS = new HashMap<>();
 
@@ -117,9 +120,7 @@ public class JsonConverter implements Converter<JsonNode> {
             public Object convert(Schema schema, JsonNode value) {
                 if (value.isNull()) return checkOptionalAndDefault(schema);
 
-                Schema elemSchema = schema.valueSchema();
-                if (elemSchema == null)
-                    throw new DataException("Array schema did not specify the element type");
+                Schema elemSchema = schema == null ? null : schema.valueSchema();
                 ArrayList<Object> result = new ArrayList<>();
                 for (JsonNode elem : value) {
                     result.add(convertToCopycat(elemSchema, elem));
@@ -132,13 +133,14 @@ public class JsonConverter implements Converter<JsonNode> {
             public Object convert(Schema schema, JsonNode value) {
                 if (value.isNull()) return checkOptionalAndDefault(schema);
 
-                Schema keySchema = schema.keySchema();
-                Schema valueSchema = schema.valueSchema();
+                Schema keySchema = schema == null ? null : schema.keySchema();
+                Schema valueSchema = schema == null ? null : schema.valueSchema();
 
                 // If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other
-                // primitive types or a complex type as a key, it will be encoded as a list of pairs
+                // primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a
+                // schema, we default to encoding in a Map.
                 Map<Object, Object> result = new HashMap<>();
-                if (keySchema.type() == Schema.Type.STRING) {
+                if (schema == null || keySchema.type() == Schema.Type.STRING) {
                     if (!value.isObject())
                         throw new DataException("Map's with string fields should be encoded as JSON objects, but found " + value.getNodeType());
                     Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
@@ -182,24 +184,73 @@ public class JsonConverter implements Converter<JsonNode> {
             }
         });
 
+
+    }
+
+    private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
+
+    private final JsonSerializer serializer = new JsonSerializer();
+    private final JsonDeserializer deserializer = new JsonDeserializer();
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG);
+        if (enableConfigsVal != null)
+            enableSchemas = enableConfigsVal.toString().equals("true");
+
+        serializer.configure(configs, isKey);
+        deserializer.configure(configs, isKey);
     }
 
     @Override
-    public JsonNode fromCopycatData(Schema schema, Object value) {
-        return convertToJsonWithSchemaEnvelope(schema, value);
+    public byte[] fromCopycatData(String topic, Schema schema, Object value) {
+        JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
+        try {
+            return serializer.serialize(topic, jsonValue);
+        } catch (SerializationException e) {
+            throw new DataException("Converting Copycat data to byte[] failed due to serialization error: ", e);
+        }
     }
 
     @Override
-    public SchemaAndValue toCopycatData(JsonNode value) {
-        if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
-            throw new DataException("JSON value converted to Copycat must be in envelope containing schema");
+    public SchemaAndValue toCopycatData(String topic, byte[] value) {
+        JsonNode jsonValue;
+        try {
+            jsonValue = deserializer.deserialize(topic, value);
+        } catch (SerializationException e) {
+            throw new DataException("Converting byte[] to Copycat data failed due to serialization error: ", e);
+        }
 
-        Schema schema = asCopycatSchema(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        return new SchemaAndValue(schema, convertToCopycat(schema, value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
+        if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
+            throw new DataException("JsonDeserializer with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields");
+
+        // The deserialized data should either be an envelope object containing the schema and the payload or the schema
+        // was stripped during serialization and we need to fill in an all-encompassing schema.
+        if (!enableSchemas) {
+            ObjectNode envelope = JsonNodeFactory.instance.objectNode();
+            envelope.set("schema", null);
+            envelope.set("payload", jsonValue);
+            jsonValue = envelope;
+        }
+
+        return jsonToCopycat(jsonValue);
     }
 
+    private SchemaAndValue jsonToCopycat(JsonNode jsonValue) {
+        if (jsonValue == null)
+            return SchemaAndValue.NULL;
+
+        if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+            throw new DataException("JSON value converted to Copycat must be in envelope containing schema");
+
+        Schema schema = asCopycatSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        return new SchemaAndValue(schema, convertToCopycat(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
+    }
 
     private static ObjectNode asJsonSchema(Schema schema) {
+        if (schema == null)
+            return null;
+
         final ObjectNode jsonSchema;
         switch (schema.type()) {
             case BOOLEAN:
@@ -369,16 +420,22 @@ public class JsonConverter implements Converter<JsonNode> {
      * @param value the value
      * @return JsonNode-encoded version
      */
-    private static JsonNode convertToJsonWithSchemaEnvelope(Schema schema, Object value) {
+    private static JsonNode convertToJsonWithEnvelope(Schema schema, Object value) {
         return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode();
     }
 
+    private static JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) {
+        return convertToJson(schema, value);
+    }
+
     /**
      * Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema
      * and the converted object.
      */
     private static JsonNode convertToJson(Schema schema, Object value) {
         if (value == null) {
+            if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
+                return null;
             if (schema.defaultValue() != null)
                 return convertToJson(schema, schema.defaultValue());
             if (schema.isOptional())
@@ -386,85 +443,141 @@ public class JsonConverter implements Converter<JsonNode> {
             throw new DataException("Conversion error: null value for field that is required and has no default value");
         }
 
-        switch (schema.type()) {
-            case INT8:
-                return JsonNodeFactory.instance.numberNode((Byte) value);
-            case INT16:
-                return JsonNodeFactory.instance.numberNode((Short) value);
-            case INT32:
-                return JsonNodeFactory.instance.numberNode((Integer) value);
-            case INT64:
-                return JsonNodeFactory.instance.numberNode((Long) value);
-            case FLOAT32:
-                return JsonNodeFactory.instance.numberNode((Float) value);
-            case FLOAT64:
-                return JsonNodeFactory.instance.numberNode((Double) value);
-            case BOOLEAN:
-                return JsonNodeFactory.instance.booleanNode((Boolean) value);
-            case STRING:
-                CharSequence charSeq = (CharSequence) value;
-                return JsonNodeFactory.instance.textNode(charSeq.toString());
-            case BYTES:
-                if (value instanceof byte[])
-                    return JsonNodeFactory.instance.binaryNode((byte[]) value);
-                else if (value instanceof ByteBuffer)
-                    return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
-                else
-                    throw new DataException("Invalid type for bytes type: " + value.getClass());
-            case ARRAY: {
-                if (!(value instanceof Collection))
-                    throw new DataException("Invalid type for array type: " + value.getClass());
-                Collection collection = (Collection) value;
-                ArrayNode list = JsonNodeFactory.instance.arrayNode();
-                for (Object elem : collection) {
-                    JsonNode fieldValue = convertToJson(schema.valueSchema(), elem);
-                    list.add(fieldValue);
-                }
-                return list;
+        try {
+            final Schema.Type schemaType;
+            if (schema == null) {
+                schemaType = CopycatSchema.schemaType(value.getClass());
+                if (schemaType == null)
+                    throw new DataException("Java class " + value.getClass() + " does not have corresponding schema type.");
+            } else {
+                schemaType = schema.type();
             }
-            case MAP: {
-                if (!(value instanceof Map))
-                    throw new DataException("Invalid type for array type: " + value.getClass());
-                Map<?, ?> map = (Map<?, ?>) value;
-                // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
-                boolean objectMode = schema.keySchema().type() == Schema.Type.STRING;
-                ObjectNode obj = null;
-                ArrayNode list = null;
-                if (objectMode)
-                    obj = JsonNodeFactory.instance.objectNode();
-                else
-                    list = JsonNodeFactory.instance.arrayNode();
-                for (Map.Entry<?, ?> entry : map.entrySet()) {
-                    JsonNode mapKey = convertToJson(schema.keySchema(), entry.getKey());
-                    JsonNode mapValue = convertToJson(schema.valueSchema(), entry.getValue());
-
+            switch (schemaType) {
+                case INT8:
+                    return JsonNodeFactory.instance.numberNode((Byte) value);
+                case INT16:
+                    return JsonNodeFactory.instance.numberNode((Short) value);
+                case INT32:
+                    return JsonNodeFactory.instance.numberNode((Integer) value);
+                case INT64:
+                    return JsonNodeFactory.instance.numberNode((Long) value);
+                case FLOAT32:
+                    return JsonNodeFactory.instance.numberNode((Float) value);
+                case FLOAT64:
+                    return JsonNodeFactory.instance.numberNode((Double) value);
+                case BOOLEAN:
+                    return JsonNodeFactory.instance.booleanNode((Boolean) value);
+                case STRING:
+                    CharSequence charSeq = (CharSequence) value;
+                    return JsonNodeFactory.instance.textNode(charSeq.toString());
+                case BYTES:
+                    if (value instanceof byte[])
+                        return JsonNodeFactory.instance.binaryNode((byte[]) value);
+                    else if (value instanceof ByteBuffer)
+                        return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
+                    else
+                        throw new DataException("Invalid type for bytes type: " + value.getClass());
+                case ARRAY: {
+                    Collection collection = (Collection) value;
+                    ArrayNode list = JsonNodeFactory.instance.arrayNode();
+                    for (Object elem : collection) {
+                        Schema valueSchema = schema == null ? null : schema.valueSchema();
+                        JsonNode fieldValue = convertToJson(valueSchema, elem);
+                        list.add(fieldValue);
+                    }
+                    return list;
+                }
+                case MAP: {
+                    Map<?, ?> map = (Map<?, ?>) value;
+                    // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
+                    boolean objectMode;
+                    if (schema == null) {
+                        objectMode = true;
+                        for (Map.Entry<?, ?> entry : map.entrySet()) {
+                            if (!(entry.getKey() instanceof String)) {
+                                objectMode = false;
+                                break;
+                            }
+                        }
+                    } else {
+                        objectMode = schema.keySchema().type() == Schema.Type.STRING;
+                    }
+                    ObjectNode obj = null;
+                    ArrayNode list = null;
                     if (objectMode)
-                        obj.set(mapKey.asText(), mapValue);
+                        obj = JsonNodeFactory.instance.objectNode();
                     else
-                        list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
+                        list = JsonNodeFactory.instance.arrayNode();
+                    for (Map.Entry<?, ?> entry : map.entrySet()) {
+                        Schema keySchema = schema == null ? null : schema.keySchema();
+                        Schema valueSchema = schema == null ? null : schema.valueSchema();
+                        JsonNode mapKey = convertToJson(keySchema, entry.getKey());
+                        JsonNode mapValue = convertToJson(valueSchema, entry.getValue());
+
+                        if (objectMode)
+                            obj.set(mapKey.asText(), mapValue);
+                        else
+                            list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
+                    }
+                    return objectMode ? obj : list;
                 }
-                return objectMode ? obj : list;
-            }
-            case STRUCT: {
-                if (!(value instanceof Struct))
-                    throw new DataException("Invalid type for struct type: " + value.getClass());
-                Struct struct = (Struct) value;
-                if (struct.schema() != schema)
-                    throw new DataException("Mismatching schema.");
-                ObjectNode obj = JsonNodeFactory.instance.objectNode();
-                for (Field field : schema.fields()) {
-                    obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
+                case STRUCT: {
+                    Struct struct = (Struct) value;
+                    if (struct.schema() != schema)
+                        throw new DataException("Mismatching schema.");
+                    ObjectNode obj = JsonNodeFactory.instance.objectNode();
+                    for (Field field : schema.fields()) {
+                        obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
+                    }
+                    return obj;
                 }
-                return obj;
             }
-        }
 
-        throw new DataException("Couldn't convert " + value + " to JSON.");
+            throw new DataException("Couldn't convert " + value + " to JSON.");
+        } catch (ClassCastException e) {
+            throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass());
+        }
     }
 
 
     private static Object convertToCopycat(Schema schema, JsonNode jsonValue) {
-        JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schema.type());
+        JsonToCopycatTypeConverter typeConverter;
+        final Schema.Type schemaType;
+        if (schema != null) {
+            schemaType = schema.type();
+        } else {
+            switch (jsonValue.getNodeType()) {
+                case NULL:
+                    // Special case. With no schema
+                    return null;
+                case BOOLEAN:
+                    schemaType = Schema.Type.BOOLEAN;
+                    break;
+                case NUMBER:
+                    if (jsonValue.isIntegralNumber())
+                        schemaType = Schema.Type.INT64;
+                    else
+                        schemaType = Schema.Type.FLOAT64;
+                    break;
+                case ARRAY:
+                    schemaType = Schema.Type.ARRAY;
+                    break;
+                case OBJECT:
+                    schemaType = Schema.Type.MAP;
+                    break;
+                case STRING:
+                    schemaType = Schema.Type.STRING;
+                    break;
+
+                case BINARY:
+                case MISSING:
+                case POJO:
+                default:
+                    schemaType = null;
+                    break;
+            }
+        }
+        typeConverter = TO_COPYCAT_CONVERTERS.get(schemaType);
         if (typeConverter == null)
             throw new DataException("Unknown schema type: " + schema.type());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
index 29c7bac..1661754 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
@@ -18,9 +18,6 @@ package org.apache.kafka.copycat.json;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Deserializer;
 
@@ -31,22 +28,6 @@ import java.util.Map;
  * structured data without having associated Java classes. This deserializer also supports Copycat schemas.
  */
 public class JsonDeserializer implements Deserializer<JsonNode> {
-    private static final ObjectNode CATCH_ALL_OBJECT_SCHEMA = JsonNodeFactory.instance.objectNode();
-    private static final ObjectNode CATCH_ALL_ARRAY_SCHEMA = JsonNodeFactory.instance.objectNode();
-    private static final ArrayNode ALL_SCHEMAS_LIST = JsonNodeFactory.instance.arrayNode();
-    private static final ObjectNode CATCH_ALL_SCHEMA = JsonNodeFactory.instance.objectNode();
-    static {
-        CATCH_ALL_OBJECT_SCHEMA.put("type", "object")
-                .putArray("field").add(JsonNodeFactory.instance.objectNode().put("*", "all"));
-
-        CATCH_ALL_ARRAY_SCHEMA.put("type", "array").put("items", "all");
-
-        ALL_SCHEMAS_LIST.add("boolean").add("int").add("long").add("float").add("double").add("bytes").add("string")
-                .add(CATCH_ALL_ARRAY_SCHEMA).add(CATCH_ALL_OBJECT_SCHEMA);
-
-        CATCH_ALL_SCHEMA.put("name", "all").set("type", ALL_SCHEMAS_LIST);
-    }
-
     private ObjectMapper objectMapper = new ObjectMapper();
 
     /**
@@ -61,6 +42,9 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
 
     @Override
     public JsonNode deserialize(String topic, byte[] bytes) {
+        if (bytes == null)
+            return null;
+
         JsonNode data;
         try {
             data = objectMapper.readTree(bytes);
@@ -68,15 +52,6 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
             throw new SerializationException(e);
         }
 
-        // The deserialized data should either be an envelope object containing the schema and the payload or the schema
-        // was stripped during serialization and we need to fill in an all-encompassing schema.
-        if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) {
-            ObjectNode envelope = JsonNodeFactory.instance.objectNode();
-            envelope.set("schema", CATCH_ALL_SCHEMA);
-            envelope.set("payload", data);
-            data = envelope;
-        }
-
         return data;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
index 80df6be..129d14b 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
@@ -28,12 +28,7 @@ import java.util.Map;
  * structured data without corresponding Java classes. This serializer also supports Copycat schemas.
  */
 public class JsonSerializer implements Serializer<JsonNode> {
-
-    private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
-    private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
-
     private final ObjectMapper objectMapper = new ObjectMapper();
-    private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
 
     /**
      * Default constructor needed by Kafka
@@ -44,9 +39,6 @@ public class JsonSerializer implements Serializer<JsonNode> {
 
     @Override
     public void configure(Map<String, ?> config, boolean isKey) {
-        Object enableConfigsVal = config.get(SCHEMAS_ENABLE_CONFIG);
-        if (enableConfigsVal != null)
-            enableSchemas = enableConfigsVal.toString().equals("true");
     }
 
     @Override
@@ -54,14 +46,7 @@ public class JsonSerializer implements Serializer<JsonNode> {
         if (data == null)
             return null;
 
-        // This serializer works for Copycat data that requires a schema to be included, so we expect it to have a
-        // specific format: { "schema": {...}, "payload": ... }.
-        if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload"))
-            throw new SerializationException("JsonSerializer requires \"schema\" and \"payload\" fields and may not contain additional fields");
-
         try {
-            if (!enableSchemas)
-                data = data.get("payload");
             return objectMapper.writeValueAsBytes(data);
         } catch (Exception e) {
             throw new SerializationException("Error serializing JSON message", e);


Mime
View raw message