kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [2/3] kafka git commit: KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter.
Date Mon, 31 Aug 2015 19:26:26 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
index ab4a86e..214f9ce 100644
--- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
+++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
@@ -19,19 +19,19 @@ package org.apache.kafka.copycat.json;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.data.SchemaBuilder;
 import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.errors.DataException;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -39,6 +39,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class JsonConverterTest {
+    private static final String TOPIC = "topic";
 
     ObjectMapper objectMapper = new ObjectMapper();
     JsonConverter converter = new JsonConverter();
@@ -48,51 +49,51 @@ public class JsonConverterTest {
     @Test
     public void testCopycatSchemaMetadataTranslation() {
         // this validates the non-type fields are translated and handled properly
-        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
-        assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }")));
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
+        assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes()));
         assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
-                converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }")));
+                converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes()));
         assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").build(), true),
-                converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }")));
+                converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }".getBytes()));
     }
 
     // Schema types
 
     @Test
     public void booleanToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
-        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }")));
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
+        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }".getBytes()));
     }
 
     @Test
     public void byteToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }")));
+        assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }".getBytes()));
     }
 
     @Test
     public void shortToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }")));
+        assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }".getBytes()));
     }
 
     @Test
     public void intToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }")));
+        assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }".getBytes()));
     }
 
     @Test
     public void longToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }")));
-        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }")));
+        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }".getBytes()));
+        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }".getBytes()));
     }
 
     @Test
     public void floatToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }")));
+        assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }".getBytes()));
     }
 
     @Test
     public void doubleToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }")));
+        assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }".getBytes()));
     }
 
 
@@ -100,69 +101,105 @@ public class JsonConverterTest {
     public void bytesToCopycat() throws UnsupportedEncodingException {
         ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
         String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
-        SchemaAndValue schemaAndValue = converter.toCopycatData(parse(msg));
+        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes());
         ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value());
         assertEquals(reference, converted);
     }
 
     @Test
     public void stringToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }")));
+        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
     }
 
     @Test
     public void arrayToCopycat() {
-        JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }");
-        assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(arrayJson));
+        byte[] arrayJson = "{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }".getBytes();
+        assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(TOPIC, arrayJson));
     }
 
     @Test
     public void mapToCopycatStringKeys() {
-        JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }");
+        byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }".getBytes();
         Map<String, Integer> expected = new HashMap<>();
         expected.put("key1", 12);
         expected.put("key2", 15);
-        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson));
+        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson));
     }
 
     @Test
     public void mapToCopycatNonStringKeys() {
-        JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }");
+        byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }".getBytes();
         Map<Integer, Integer> expected = new HashMap<>();
         expected.put(1, 12);
         expected.put(2, 15);
-        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson));
+        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson));
     }
 
     @Test
     public void structToCopycat() {
-        JsonNode structJson = parse("{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }");
+        byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }".getBytes();
         Schema expectedSchema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
         Struct expected = new Struct(expectedSchema).put("field1", true).put("field2", "string");
-        SchemaAndValue converted = converter.toCopycatData(structJson);
+        SchemaAndValue converted = converter.toCopycatData(TOPIC, structJson);
         assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
     }
 
+    @Test(expected = DataException.class)
+    public void nullToCopycat() {
+        // When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope
+        assertEquals(SchemaAndValue.NULL, converter.toCopycatData(TOPIC, null));
+    }
+
+    @Test
+    public void nullSchemaPrimitiveToCopycat() {
+        SchemaAndValue converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());
+        assertEquals(SchemaAndValue.NULL, converted);
+
+        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": true }".getBytes());
+        assertEquals(new SchemaAndValue(null, true), converted);
+
+        // Integers: Copycat has more data types, and JSON unfortunately mixes all number types. We try to preserve
+        // info as best we can, so we always use the largest integer and floating point numbers we can and have Jackson
+        // determine if it's an integer or not
+        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12 }".getBytes());
+        assertEquals(new SchemaAndValue(null, 12L), converted);
+
+        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12.24 }".getBytes());
+        assertEquals(new SchemaAndValue(null, 12.24), converted);
+
+        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": \"a string\" }".getBytes());
+        assertEquals(new SchemaAndValue(null, "a string"), converted);
+
+        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": [1, \"2\", 3] }".getBytes());
+        assertEquals(new SchemaAndValue(null, Arrays.asList(1L, "2", 3L)), converted);
+
+        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": { \"field1\": 1, \"field2\": 2} }".getBytes());
+        Map<String, Long> obj = new HashMap<>();
+        obj.put("field1", 1L);
+        obj.put("field2", 2L);
+        assertEquals(new SchemaAndValue(null, obj), converted);
+    }
+
     // Schema metadata
 
     @Test
     public void testJsonSchemaMetadataTranslation() {
-        JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
 
-        converted = converter.fromCopycatData(Schema.OPTIONAL_BOOLEAN_SCHEMA, null);
+        converted = parse(converter.fromCopycatData(TOPIC, Schema.OPTIONAL_BOOLEAN_SCHEMA, null));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"boolean\", \"optional\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNull());
 
-        converted = converter.fromCopycatData(SchemaBuilder.bool().defaultValue(true).build(), true);
+        converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().defaultValue(true).build(), true));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
 
-        converted = converter.fromCopycatData(SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true);
+        converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\"}"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
@@ -173,7 +210,7 @@ public class JsonConverterTest {
 
     @Test
     public void booleanToJson() {
-        JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
@@ -181,7 +218,7 @@ public class JsonConverterTest {
 
     @Test
     public void byteToJson() {
-        JsonNode converted = converter.fromCopycatData(Schema.INT8_SCHEMA, (byte) 12);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT8_SCHEMA, (byte) 12));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"int8\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
@@ -189,7 +226,7 @@ public class JsonConverterTest {
 
     @Test
     public void shortToJson() {
-        JsonNode converted = converter.fromCopycatData(Schema.INT16_SCHEMA, (short) 12);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT16_SCHEMA, (short) 12));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"int16\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
@@ -197,7 +234,7 @@ public class JsonConverterTest {
 
     @Test
     public void intToJson() {
-        JsonNode converted = converter.fromCopycatData(Schema.INT32_SCHEMA, 12);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT32_SCHEMA, 12));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
@@ -205,7 +242,7 @@ public class JsonConverterTest {
 
     @Test
     public void longToJson() {
-        JsonNode converted = converter.fromCopycatData(Schema.INT64_SCHEMA, 4398046511104L);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT64_SCHEMA, 4398046511104L));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"int64\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue());
@@ -213,7 +250,7 @@ public class JsonConverterTest {
 
     @Test
     public void floatToJson() {
-        JsonNode converted = converter.fromCopycatData(Schema.FLOAT32_SCHEMA, 12.34f);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT32_SCHEMA, 12.34f));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001);
@@ -221,7 +258,7 @@ public class JsonConverterTest {
 
     @Test
     public void doubleToJson() {
-        JsonNode converted = converter.fromCopycatData(Schema.FLOAT64_SCHEMA, 12.34);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, 12.34));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001);
@@ -229,7 +266,7 @@ public class JsonConverterTest {
 
     @Test
     public void bytesToJson() throws IOException {
-        JsonNode converted = converter.fromCopycatData(Schema.BYTES_SCHEMA, "test-string".getBytes());
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BYTES_SCHEMA, "test-string".getBytes()));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(ByteBuffer.wrap("test-string".getBytes()),
@@ -238,7 +275,7 @@ public class JsonConverterTest {
 
     @Test
     public void stringToJson() {
-        JsonNode converted = converter.fromCopycatData(Schema.STRING_SCHEMA, "test-string");
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, "test-string"));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
@@ -247,7 +284,7 @@ public class JsonConverterTest {
     @Test
     public void arrayToJson() {
         Schema int32Array = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
-        JsonNode converted = converter.fromCopycatData(int32Array, Arrays.asList(1, 2, 3));
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, int32Array, Arrays.asList(1, 2, 3)));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int32\", \"optional\": false }, \"optional\": false }"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
@@ -261,7 +298,7 @@ public class JsonConverterTest {
         Map<String, Integer> input = new HashMap<>();
         input.put("key1", 12);
         input.put("key2", 15);
-        JsonNode converted = converter.fromCopycatData(stringIntMap, input);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, stringIntMap, input));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
@@ -275,21 +312,28 @@ public class JsonConverterTest {
         Map<Integer, Integer> input = new HashMap<>();
         input.put(1, 12);
         input.put(2, 15);
-        JsonNode converted = converter.fromCopycatData(intIntMap, input);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, intIntMap, input));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"int32\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(JsonNodeFactory.instance.arrayNode()
-                        .add(JsonNodeFactory.instance.arrayNode().add(1).add(12))
-                        .add(JsonNodeFactory.instance.arrayNode().add(2).add(15)),
-                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+
+        assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
+        ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertEquals(2, payload.size());
+        Set<JsonNode> payloadEntries = new HashSet<>();
+        for (JsonNode elem : payload)
+            payloadEntries.add(elem);
+        assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add(1).add(12),
+                        JsonNodeFactory.instance.arrayNode().add(2).add(15))),
+                payloadEntries
+        );
     }
 
     @Test
     public void structToJson() {
         Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
         Struct input = new Struct(schema).put("field1", true).put("field2", "string");
-        JsonNode converted = converter.fromCopycatData(schema, input);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, schema, input));
         validateEnvelope(converted);
         assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
@@ -299,6 +343,102 @@ public class JsonConverterTest {
                 converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
     }
 
+
+    @Test
+    public void nullSchemaAndPrimitiveToJson() {
+        // This still needs to do conversion of data, null schema means "anything goes"
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true));
+        validateEnvelopeNullSchema(converted);
+        assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+    }
+
+    @Test
+    public void nullSchemaAndArrayToJson() {
+        // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+        // types to verify conversion still works.
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, Arrays.asList(1, "string", true)));
+        validateEnvelopeNullSchema(converted);
+        assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add("string").add(true),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    @Test
+    public void nullSchemaAndMapToJson() {
+        // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+        // types to verify conversion still works.
+        Map<String, Object> input = new HashMap<>();
+        input.put("key1", 12);
+        input.put("key2", "string");
+        input.put("key3", true);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input));
+        validateEnvelopeNullSchema(converted);
+        assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", "string").put("key3", true),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+    @Test
+    public void nullSchemaAndMapNonStringKeysToJson() {
+        // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+        // types to verify conversion still works.
+        Map<Object, Object> input = new HashMap<>();
+        input.put("string", 12);
+        input.put(52, "string");
+        input.put(false, true);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input));
+        validateEnvelopeNullSchema(converted);
+        assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
+        ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertEquals(3, payload.size());
+        Set<JsonNode> payloadEntries = new HashSet<>();
+        for (JsonNode elem : payload)
+            payloadEntries.add(elem);
+        assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add("string").add(12),
+                        JsonNodeFactory.instance.arrayNode().add(52).add("string"),
+                        JsonNodeFactory.instance.arrayNode().add(false).add(true))),
+                payloadEntries
+        );
+    }
+
+
+    @Test(expected = DataException.class)
+    public void mismatchSchemaJson() {
+        // If we have mismatching schema info, we should properly convert to a DataException
+        converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, true);
+    }
+
+
+
+    @Test
+    public void noSchemaToCopycat() {
+        Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
+        converter.configure(props, true);
+        assertEquals(new SchemaAndValue(null, true), converter.toCopycatData(TOPIC, "true".getBytes()));
+    }
+
+    @Test
+    public void noSchemaToJson() {
+        Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
+        converter.configure(props, true);
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true));
+        assertTrue(converted.isBoolean());
+        assertEquals(true, converted.booleanValue());
+    }
+
+
+
+    private JsonNode parse(byte[] json) {
+        try {
+            return objectMapper.readTree(json);
+        } catch (IOException e) {
+            fail("IOException during JSON parse: " + e.getMessage());
+            throw new RuntimeException("failed");
+        }
+    }
+
     private JsonNode parse(String json) {
         try {
             return objectMapper.readTree(json);
@@ -316,4 +456,13 @@ public class JsonConverterTest {
         assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject());
         assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
     }
+
+    private void validateEnvelopeNullSchema(JsonNode env) {
+        assertNotNull(env);
+        assertTrue(env.isObject());
+        assertEquals(2, env.size());
+        assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+        assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
index 23cdf4d..a976d90 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
@@ -32,8 +32,7 @@ import java.util.Properties;
 public class WorkerConfig extends AbstractConfig {
 
     public static final String CLUSTER_CONFIG = "cluster";
-    private static final String
-            CLUSTER_CONFIG_DOC =
+    private static final String CLUSTER_CONFIG_DOC =
             "ID for this cluster, which is used to provide a namespace so multiple Copycat clusters "
                     + "or instances may co-exist while sharing a single Kafka cluster.";
     public static final String CLUSTER_DEFAULT = "copycat";
@@ -58,21 +57,13 @@ public class WorkerConfig extends AbstractConfig {
     public static final String VALUE_CONVERTER_CLASS_DOC =
             "Converter class for value Copycat data that implements the <code>Converter</code> interface.";
 
-    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
-    public static final String KEY_SERIALIZER_CLASS_DOC =
-            "Serializer class for key that implements the <code>Serializer</code> interface.";
+    public static final String OFFSET_KEY_CONVERTER_CLASS_CONFIG = "offset.key.converter";
+    public static final String OFFSET_KEY_CONVERTER_CLASS_DOC =
+            "Converter class for offset key Copycat data that implements the <code>Converter</code> interface.";
 
-    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
-    public static final String VALUE_SERIALIZER_CLASS_DOC =
-            "Serializer class for value that implements the <code>Serializer</code> interface.";
-
-    public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
-    public static final String KEY_DESERIALIZER_CLASS_DOC =
-            "Serializer class for key that implements the <code>Deserializer</code> interface.";
-
-    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
-    public static final String VALUE_DESERIALIZER_CLASS_DOC =
-            "Deserializer class for value that implements the <code>Deserializer</code> interface.";
+    public static final String OFFSET_VALUE_CONVERTER_CLASS_CONFIG = "offset.value.converter";
+    public static final String OFFSET_VALUE_CONVERTER_CLASS_DOC =
+            "Converter class for offset value Copycat data that implements the <code>Converter</code> interface.";
 
     public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
             = "task.shutdown.graceful.timeout.ms";
@@ -104,14 +95,10 @@ public class WorkerConfig extends AbstractConfig {
                         Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
                 .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
-                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
-                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
-                .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
-                .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
+                .define(OFFSET_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, OFFSET_KEY_CONVERTER_CLASS_DOC)
+                .define(OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, OFFSET_VALUE_CONVERTER_CLASS_DOC)
                 .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
                         TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
                         TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 704470a..6cbce0b 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.copycat.runtime;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -47,34 +45,36 @@ import java.util.Properties;
  * Since each task has a dedicated thread, this is mainly just a container for them.
  * </p>
  */
-public class Worker<K, V> {
+public class Worker {
     private static final Logger log = LoggerFactory.getLogger(Worker.class);
 
     private Time time;
     private WorkerConfig config;
-    private Converter<K> keyConverter;
-    private Converter<V> valueConverter;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private Converter offsetKeyConverter;
+    private Converter offsetValueConverter;
     private OffsetBackingStore offsetBackingStore;
-    private Serializer<K> offsetKeySerializer;
-    private Serializer<V> offsetValueSerializer;
-    private Deserializer<K> offsetKeyDeserializer;
-    private Deserializer<V> offsetValueDeserializer;
     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
-    private KafkaProducer<K, V> producer;
+    private KafkaProducer<byte[], byte[]> producer;
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
     public Worker(WorkerConfig config) {
-        this(new SystemTime(), config, null, null, null, null, null);
+        this(new SystemTime(), config, null);
     }
 
     @SuppressWarnings("unchecked")
-    public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
-                  Serializer offsetKeySerializer, Serializer offsetValueSerializer,
-                  Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
+    public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
         this.time = time;
         this.config = config;
         this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
         this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
+        this.offsetKeyConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.offsetKeyConverter.configure(config.originalsWithPrefix("offset.key.converter."), true);
+        this.offsetValueConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.offsetValueConverter.configure(config.originalsWithPrefix("offset.value.converter."), false);
 
         if (offsetBackingStore != null) {
             this.offsetBackingStore = offsetBackingStore;
@@ -82,34 +82,6 @@ public class Worker<K, V> {
             this.offsetBackingStore = new FileOffsetBackingStore();
             this.offsetBackingStore.configure(config.originals());
         }
-
-        if (offsetKeySerializer != null) {
-            this.offsetKeySerializer = offsetKeySerializer;
-        } else {
-            this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
-            this.offsetKeySerializer.configure(config.originals(), true);
-        }
-
-        if (offsetValueSerializer != null) {
-            this.offsetValueSerializer = offsetValueSerializer;
-        } else {
-            this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
-            this.offsetValueSerializer.configure(config.originals(), false);
-        }
-
-        if (offsetKeyDeserializer != null) {
-            this.offsetKeyDeserializer = offsetKeyDeserializer;
-        } else {
-            this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-            this.offsetKeyDeserializer.configure(config.originals(), true);
-        }
-
-        if (offsetValueDeserializer != null) {
-            this.offsetValueDeserializer = offsetValueDeserializer;
-        } else {
-            this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-            this.offsetValueDeserializer.configure(config.originals(), false);
-        }
     }
 
     public void start() {
@@ -119,8 +91,8 @@ public class Worker<K, V> {
 
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         for (String propName : unusedConfigs.stringPropertyNames()) {
             producerProps.put(propName, unusedConfigs.getProperty(propName));
         }
@@ -188,14 +160,14 @@ public class Worker<K, V> {
         final WorkerTask workerTask;
         if (task instanceof SourceTask) {
             SourceTask sourceTask = (SourceTask) task;
-            OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.connector(),
-                    keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
-            OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.connector(),
-                    keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
-            workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
+            OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
+                    offsetKeyConverter, offsetValueConverter);
+            OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
+                    offsetKeyConverter, offsetValueConverter);
+            workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
                     offsetReader, offsetWriter, config, time);
         } else if (task instanceof SinkTask) {
-            workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time);
+            workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index dfb1f96..7e71fb8 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -38,21 +38,21 @@ import java.util.concurrent.TimeUnit;
 /**
  * WorkerTask that uses a SinkTask to export data from Kafka.
  */
-class WorkerSinkTask<K, V> implements WorkerTask {
+class WorkerSinkTask implements WorkerTask {
     private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
 
     private final ConnectorTaskId id;
     private final SinkTask task;
     private final WorkerConfig workerConfig;
     private final Time time;
-    private final Converter<K> keyConverter;
-    private final Converter<V> valueConverter;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
     private WorkerSinkTaskThread workThread;
-    private KafkaConsumer<K, V> consumer;
+    private KafkaConsumer<byte[], byte[]> consumer;
     private final SinkTaskContext context;
 
     public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
-                          Converter<K> keyConverter, Converter<V> valueConverter, Time time) {
+                          Converter keyConverter, Converter valueConverter, Time time) {
         this.id = id;
         this.task = task;
         this.workerConfig = workerConfig;
@@ -107,7 +107,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
     public void poll(long timeoutMs) {
         try {
             log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
-            ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
+            ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
             log.trace("{} polling returned {} messages", id, msgs.count());
             deliverMessages(msgs);
         } catch (ConsumerWakeupException we) {
@@ -154,7 +154,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
         return workerConfig;
     }
 
-    private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
+    private KafkaConsumer<byte[], byte[]> createConsumer(Properties taskProps) {
         String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
         if (topicsStr == null || topicsStr.isEmpty())
             throw new CopycatException("Sink tasks require a list of topics.");
@@ -168,12 +168,10 @@ class WorkerSinkTask<K, V> implements WorkerTask {
                 Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                workerConfig.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName());
-        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
-        KafkaConsumer<K, V> newConsumer;
+        KafkaConsumer<byte[], byte[]> newConsumer;
         try {
             newConsumer = new KafkaConsumer<>(props);
         } catch (Throwable t) {
@@ -202,14 +200,14 @@ class WorkerSinkTask<K, V> implements WorkerTask {
         return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
     }
 
-    private void deliverMessages(ConsumerRecords<K, V> msgs) {
+    private void deliverMessages(ConsumerRecords<byte[], byte[]> msgs) {
         // Finally, deliver this batch to the sink
         if (msgs.count() > 0) {
             List<SinkRecord> records = new ArrayList<>();
-            for (ConsumerRecord<K, V> msg : msgs) {
+            for (ConsumerRecord<byte[], byte[]> msg : msgs) {
                 log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
-                SchemaAndValue keyAndSchema = msg.key() != null ? keyConverter.toCopycatData(msg.key()) : SchemaAndValue.NULL;
-                SchemaAndValue valueAndSchema = msg.value() != null ? valueConverter.toCopycatData(msg.value()) : SchemaAndValue.NULL;
+                SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key());
+                SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value());
                 records.add(
                         new SinkRecord(msg.topic(), msg.partition(),
                                 keyAndSchema.schema(), keyAndSchema.value(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index 14b9c3a..ee0a532 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.copycat.cli.WorkerConfig;
-import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -46,33 +45,31 @@ import java.util.concurrent.TimeoutException;
 /**
  * WorkerTask that uses a SourceTask to ingest data into Kafka.
  */
-class WorkerSourceTask<K, V> implements WorkerTask {
+class WorkerSourceTask implements WorkerTask {
     private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
 
     private ConnectorTaskId id;
     private SourceTask task;
-    private final Converter<K> keyConverter;
-    private final Converter<V> valueConverter;
-    private KafkaProducer<K, V> producer;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private KafkaProducer<byte[], byte[]> producer;
     private WorkerSourceTaskThread workThread;
     private OffsetStorageReader offsetReader;
-    private OffsetStorageWriter<K, V> offsetWriter;
+    private OffsetStorageWriter offsetWriter;
     private final WorkerConfig workerConfig;
     private final Time time;
 
     // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
     // there is no IdentityHashSet.
-    private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
-            outstandingMessages;
+    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
     // A second buffer is used while an offset flush is running
-    private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
-            outstandingMessagesBacklog;
+    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
     private boolean flushing;
 
     public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
-                            Converter<K> keyConverter, Converter<V> valueConverter,
-                            KafkaProducer<K, V> producer,
-                            OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter,
+                            Converter keyConverter, Converter valueConverter,
+                            KafkaProducer<byte[], byte[]> producer,
+                            OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig, Time time) {
         this.id = id;
         this.task = task;
@@ -132,10 +129,9 @@ class WorkerSourceTask<K, V> implements WorkerTask {
      */
     private synchronized void sendRecords(List<SourceRecord> records) {
         for (SourceRecord record : records) {
-            K key = (record.keySchema() != null) ? keyConverter.fromCopycatData(record.keySchema(), record.key()) : null;
-            V value = (record.valueSchema() != null) ? valueConverter.fromCopycatData(record.valueSchema(), record.value()) : null;
-            final ProducerRecord<K, V> producerRecord = new ProducerRecord<>(
-                    record.topic(), record.kafkaPartition(), key, value);
+            byte[] key = keyConverter.fromCopycatData(record.topic(), record.keySchema(), record.key());
+            byte[] value = valueConverter.fromCopycatData(record.topic(), record.valueSchema(), record.value());
+            final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);
             log.trace("Appending record with key {}, value {}", record.key(), record.value());
             if (!flushing) {
                 outstandingMessages.put(producerRecord, producerRecord);
@@ -158,13 +154,12 @@ class WorkerSourceTask<K, V> implements WorkerTask {
                         }
                     });
             // Offsets are converted & serialized in the OffsetWriter
-            offsetWriter.offset(new SchemaAndValue(record.sourcePartitionSchema(), record.sourcePartition()),
-                    new SchemaAndValue(record.sourceOffsetSchema(), record.sourceOffset()));
+            offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
         }
     }
 
-    private synchronized void recordSent(final ProducerRecord<K, V> record) {
-        ProducerRecord<K, V> removed = outstandingMessages.remove(record);
+    private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) {
+        ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
         // While flushing, we may also see callbacks for items in the backlog
         if (removed == null && flushing)
             removed = outstandingMessagesBacklog.remove(record);
@@ -276,7 +271,7 @@ class WorkerSourceTask<K, V> implements WorkerTask {
 
     private void finishSuccessfulFlush() {
         // If we were successful, we can just swap instead of replacing items back into the original map
-        IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages;
+        IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = outstandingMessages;
         outstandingMessages = outstandingMessagesBacklog;
         outstandingMessagesBacklog = temp;
         flushing = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
index 237eda6..7521955 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.copycat.storage;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.slf4j.Logger;
@@ -35,39 +33,36 @@ import java.util.Map;
  * directly, the interface is only separate from this implementation because it needs to be
  * included in the public API package.
  */
-public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
+public class OffsetStorageReaderImpl implements OffsetStorageReader {
     private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
 
     private final OffsetBackingStore backingStore;
     private final String namespace;
-    private final Converter<K> keyConverter;
-    private final Converter<V> valueConverter;
-    private final Serializer<K> keySerializer;
-    private final Deserializer<V> valueDeserializer;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
 
     public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
-                                   Converter<K> keyConverter, Converter<V> valueConverter,
-                                   Serializer<K> keySerializer, Deserializer<V> valueDeserializer) {
+                                   Converter keyConverter, Converter valueConverter) {
         this.backingStore = backingStore;
         this.namespace = namespace;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
-        this.keySerializer = keySerializer;
-        this.valueDeserializer = valueDeserializer;
     }
 
     @Override
-    public SchemaAndValue offset(SchemaAndValue partition) {
+    public <T> Map<String, Object> offset(Map<String, T> partition) {
         return offsets(Arrays.asList(partition)).get(partition);
     }
 
     @Override
-    public Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue> partitions) {
+    public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) {
         // Serialize keys so backing store can work with them
-        Map<ByteBuffer, SchemaAndValue> serializedToOriginal = new HashMap<>(partitions.size());
-        for (SchemaAndValue key : partitions) {
+        Map<ByteBuffer, Map<String, T>> serializedToOriginal = new HashMap<>(partitions.size());
+        for (Map<String, T> key : partitions) {
             try {
-                byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key.schema(), key.value()));
+                // Offsets are treated as schemaless, their format is only validated here (and the returned value below)
+                OffsetUtils.validateFormat(key);
+                byte[] keySerialized = keyConverter.fromCopycatData(namespace, null, key);
                 ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
                 serializedToOriginal.put(keyBuffer, key);
             } catch (Throwable t) {
@@ -87,7 +82,7 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
         }
 
         // Deserialize all the values and map back to the original keys
-        Map<SchemaAndValue, SchemaAndValue> result = new HashMap<>(partitions.size());
+        Map<Map<String, T>, Map<String, Object>> result = new HashMap<>(partitions.size());
         for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
             try {
                 // Since null could be a valid key, explicitly check whether map contains the key
@@ -96,12 +91,12 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
                             + "store may have returned invalid data", rawEntry.getKey());
                     continue;
                 }
-                SchemaAndValue origKey = serializedToOriginal.get(rawEntry.getKey());
-                SchemaAndValue deserializedValue = valueConverter.toCopycatData(
-                        valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
-                );
+                Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey());
+                SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array());
+                Object deserializedValue = deserializedSchemaAndValue.value();
+                OffsetUtils.validateFormat(deserializedValue);
 
-                result.put(origKey, deserializedValue);
+                result.put(origKey, (Map<String, Object>) deserializedValue);
             } catch (Throwable t) {
                 log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with"
                         + " namespace {}. No value for this data will be returned, which may break the "

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
index 4fb75e7..be8c718 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
@@ -17,16 +17,13 @@
 
 package org.apache.kafka.copycat.storage;
 
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.Callback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.Future;
 
 /**
@@ -64,32 +61,27 @@ import java.util.concurrent.Future;
  * This class is not thread-safe. It should only be accessed from a Task's processing thread.
  * </p>
  */
-public class OffsetStorageWriter<K, V> {
+public class OffsetStorageWriter {
     private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
 
     private final OffsetBackingStore backingStore;
-    private final Converter<K> keyConverter;
-    private final Converter<V> valueConverter;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
     private final String namespace;
     // Offset data in Copycat format
-    private Map<SchemaAndValue, SchemaAndValue> data = new HashMap<>();
+    private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>();
 
     // Not synchronized, should only be accessed by flush thread
-    private Map<SchemaAndValue, SchemaAndValue> toFlush = null;
+    private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
     // Unique ID for each flush request to handle callbacks after timeouts
     private long currentFlushId = 0;
 
     public OffsetStorageWriter(OffsetBackingStore backingStore,
-                               String namespace, Converter<K> keyConverter, Converter<V> valueConverter,
-                               Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+                               String namespace, Converter keyConverter, Converter valueConverter) {
         this.backingStore = backingStore;
         this.namespace = namespace;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
     }
 
     /**
@@ -97,8 +89,8 @@ public class OffsetStorageWriter<K, V> {
      * @param partition the partition to store an offset for
      * @param offset the offset
      */
-    public synchronized void offset(SchemaAndValue partition, SchemaAndValue offset) {
-        data.put(partition, offset);
+    public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) {
+        data.put((Map<String, Object>) partition, (Map<String, Object>) offset);
     }
 
     private boolean flushing() {
@@ -142,10 +134,14 @@ public class OffsetStorageWriter<K, V> {
         Map<ByteBuffer, ByteBuffer> offsetsSerialized;
         try {
             offsetsSerialized = new HashMap<>();
-            for (Map.Entry<SchemaAndValue, SchemaAndValue> entry : toFlush.entrySet()) {
-                byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey().schema(), entry.getKey().value()));
+            for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) {
+                // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate
+                // for that data. The only enforcement of the format is here.
+                OffsetUtils.validateFormat(entry.getKey());
+                OffsetUtils.validateFormat(entry.getValue());
+                byte[] key = keyConverter.fromCopycatData(namespace, null, entry.getKey());
                 ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
-                byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue().schema(), entry.getValue().value()));
+                byte[] value = valueConverter.fromCopycatData(namespace, null, entry.getValue());
                 ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
                 offsetsSerialized.put(keyBuffer, valueBuffer);
             }
@@ -155,6 +151,7 @@ public class OffsetStorageWriter<K, V> {
             log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
                     + "offsets under namespace {}. This likely won't recover unless the "
                     + "unserializable partition or offset information is overwritten.", namespace);
+            log.error("Cause of serialization failure:", t);
             callback.onCompletion(t, null);
             return null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
new file mode 100644
index 0000000..bd3a87b
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.data.CopycatSchema;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.util.Map;
+
+public class OffsetUtils {
+    public static void validateFormat(Object offsetData) {
+        if (!(offsetData instanceof Map))
+            throw new DataException("Offsets must be specified as a Map");
+        validateFormat((Map<Object, Object>) offsetData);
+    }
+
+    public static <K, V> void validateFormat(Map<K, V> offsetData) {
+        for (Map.Entry<K, V> entry : offsetData.entrySet()) {
+            if (!(entry.getKey() instanceof String))
+                throw new DataException("Offsets may only use String keys");
+
+            Object value = entry.getValue();
+            if (value == null)
+                continue;
+            Schema.Type schemaType = CopycatSchema.schemaType(value.getClass());
+            if (!schemaType.isPrimitive())
+                throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 54e9bc6..542ed76 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -67,10 +67,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     private Time time;
     @Mock private SinkTask sinkTask;
     private WorkerConfig workerConfig;
-    @Mock private Converter<byte[]> keyConverter;
+    @Mock private Converter keyConverter;
     @Mock
-    private Converter<byte[]> valueConverter;
-    private WorkerSinkTask<Integer, String> workerTask;
+    private Converter valueConverter;
+    private WorkerSinkTask workerTask;
     @Mock private KafkaConsumer<byte[], byte[]> consumer;
     private WorkerSinkTaskThread workerThread;
 
@@ -84,10 +84,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
-        workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
-        workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
-        workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
+        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
         workerConfig = new WorkerConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
@@ -138,12 +138,12 @@ public class WorkerSinkTaskTest extends ThreadedTest {
 
         ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                 Collections.singletonMap(
-                        new TopicPartition("topic", 0),
-                        Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
+                        new TopicPartition(TOPIC, 0),
+                        Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, RAW_KEY, RAW_VALUE))));
 
         // Exact data doesn't matter, but should be passed directly to sink task
-        EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
-        EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
+        EasyMock.expect(keyConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_KEY))).andReturn(record);
+        EasyMock.expect(valueConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_VALUE))).andReturn(record);
         Capture<Collection<SinkRecord>> capturedRecords
                 = EasyMock.newCapture(CaptureType.ALL);
         sinkTask.put(EasyMock.capture(capturedRecords));
@@ -320,8 +320,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
                         return records;
                     }
                 });
-        EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
-        EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
+        EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
+        EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
         Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
         sinkTask.put(EasyMock.capture(capturedRecords));
         EasyMock.expectLastCall().anyTimes();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 018aa94..3ff3a62 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -45,11 +44,7 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -59,37 +54,36 @@ import static org.junit.Assert.*;
 
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
-    private static final Schema PARTITION_SCHEMA = Schema.BYTES_SCHEMA;
-    private static final byte[] PARTITION_BYTES = "partition".getBytes();
-    private static final Schema OFFSET_SCHEMA = Schema.BYTES_SCHEMA;
-    private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
+    private static final String TOPIC = "topic";
+    private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
+    private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
 
     // Copycat-format data
     private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
     private static final Integer KEY = -1;
     private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
     private static final Long RECORD = 12L;
-    // Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version
+    // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version
     // is used in the right place.
-    private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes());
-    private static final String CONVERTED_RECORD = "converted-record";
+    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
+    private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private WorkerConfig config;
     @Mock private SourceTask sourceTask;
-    @Mock private Converter<ByteBuffer> keyConverter;
-    @Mock private Converter<String> valueConverter;
-    @Mock private KafkaProducer<ByteBuffer, String> producer;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+    @Mock private KafkaProducer<byte[], byte[]> producer;
     @Mock private OffsetStorageReader offsetReader;
-    @Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
-    private WorkerSourceTask<ByteBuffer, String> workerTask;
+    @Mock private OffsetStorageWriter offsetWriter;
+    private WorkerSourceTask workerTask;
     @Mock private Future<RecordMetadata> sendFuture;
 
     private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
 
     private static final Properties EMPTY_TASK_PROPS = new Properties();
     private static final List<SourceRecord> RECORDS = Arrays.asList(
-            new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
+            new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
     );
 
     @Override
@@ -98,16 +92,16 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
-        workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
-        workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
-        workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
+        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
         config = new WorkerConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
     }
 
     private void createWorkerTask() {
-        workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer,
+        workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
                 offsetReader, offsetWriter, config, new SystemTime());
     }
 
@@ -201,15 +195,15 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         List<SourceRecord> records = new ArrayList<>();
         // Can just use the same record for key and value
-        records.add(new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
+        records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
 
-        Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord();
 
         PowerMock.replayAll();
 
         Whitebox.invokeMethod(workerTask, "sendRecords", records);
-        assertEquals(CONVERTED_KEY, sent.getValue().key());
-        assertEquals(CONVERTED_RECORD, sent.getValue().value());
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
 
         PowerMock.verifyAll();
     }
@@ -233,11 +227,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         return latch;
     }
 
-    private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException {
-        EasyMock.expect(keyConverter.fromCopycatData(KEY_SCHEMA, KEY)).andStubReturn(CONVERTED_KEY);
-        EasyMock.expect(valueConverter.fromCopycatData(RECORD_SCHEMA, RECORD)).andStubReturn(CONVERTED_RECORD);
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() throws InterruptedException {
+        EasyMock.expect(keyConverter.fromCopycatData(TOPIC, KEY_SCHEMA, KEY)).andStubReturn(SERIALIZED_KEY);
+        EasyMock.expect(valueConverter.fromCopycatData(TOPIC, RECORD_SCHEMA, RECORD)).andStubReturn(SERIALIZED_RECORD);
 
-        Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
+        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
         // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
         EasyMock.expect(
                 producer.send(EasyMock.capture(sent),
@@ -255,7 +249,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
                     }
                 });
         // 2. Offset data is passed to the offset storage.
-        offsetWriter.offset(new SchemaAndValue(PARTITION_SCHEMA, PARTITION_BYTES), new SchemaAndValue(OFFSET_SCHEMA, OFFSET_BYTES));
+        offsetWriter.offset(PARTITION, OFFSET);
         PowerMock.expectLastCall().anyTimes();
 
         return sent;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 32e7ff9..701e230 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.copycat.runtime;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.copycat.cli.WorkerConfig;
@@ -49,10 +47,6 @@ public class WorkerTest extends ThreadedTest {
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private Worker worker;
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
-    private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class);
-    private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class);
-    private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class);
-    private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class);
 
     @Before
     public void setup() {
@@ -61,14 +55,12 @@ public class WorkerTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
-        workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
-        workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
-        workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
+        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
         WorkerConfig config = new WorkerConfig(workerProps);
-        worker = new Worker(new MockTime(), config, offsetBackingStore,
-                offsetKeySerializer, offsetValueSerializer,
-                offsetKeyDeserializer, offsetValueDeserializer);
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
index 9c0c52d..956d064 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
@@ -17,10 +17,6 @@
 
 package org.apache.kafka.copycat.storage;
 
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.data.SchemaBuilder;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.Callback;
 import org.easymock.Capture;
@@ -35,9 +31,7 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
 
@@ -48,13 +42,9 @@ import static org.junit.Assert.assertTrue;
 public class OffsetStorageWriterTest {
     private static final String NAMESPACE = "namespace";
     // Copycat format - any types should be accepted here
-    private static final Schema OFFSET_KEY_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
-    private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
-    private static final Schema OFFSET_VALUE_SCHEMA = Schema.STRING_SCHEMA;
-    private static final String OFFSET_VALUE = "value";
-    // Native objects - must match serializer types
-    private static final int OFFSET_KEY_CONVERTED = 12;
-    private static final String OFFSET_VALUE_CONVERTED = "value-converted";
+    private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key", "key");
+    private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key", 12);
+
     // Serialized
     private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
     private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
@@ -63,11 +53,9 @@ public class OffsetStorageWriterTest {
             ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
 
     @Mock private OffsetBackingStore store;
-    @Mock private Converter<Integer> keyConverter;
-    @Mock private Converter<String> valueConverter;
-    @Mock private Serializer<Integer> keySerializer;
-    @Mock private Serializer<String> valueSerializer;
-    private OffsetStorageWriter<Integer, String> writer;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+    private OffsetStorageWriter writer;
 
     private static Exception exception = new RuntimeException("error");
 
@@ -75,7 +63,7 @@ public class OffsetStorageWriterTest {
 
     @Before
     public void setup() {
-        writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
+        writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter);
         service = Executors.newFixedThreadPool(1);
     }
 
@@ -92,7 +80,7 @@ public class OffsetStorageWriterTest {
 
         PowerMock.replayAll();
 
-        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
 
         assertTrue(writer.beginFlush());
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
@@ -128,7 +116,7 @@ public class OffsetStorageWriterTest {
 
         PowerMock.replayAll();
 
-        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
         assertTrue(writer.beginFlush());
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         assertTrue(writer.beginFlush());
@@ -148,7 +136,7 @@ public class OffsetStorageWriterTest {
 
         PowerMock.replayAll();
 
-        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
         assertTrue(writer.beginFlush());
         writer.doFlush(callback);
         assertTrue(writer.beginFlush()); // should throw
@@ -160,7 +148,7 @@ public class OffsetStorageWriterTest {
     public void testCancelBeforeAwaitFlush() {
         PowerMock.replayAll();
 
-        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
         assertTrue(writer.beginFlush());
         writer.cancelFlush();
 
@@ -178,7 +166,7 @@ public class OffsetStorageWriterTest {
 
         PowerMock.replayAll();
 
-        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
         assertTrue(writer.beginFlush());
         // Start the flush, then immediately cancel before allowing the mocked store request to finish
         Future<Void> flushFuture = writer.doFlush(callback);
@@ -207,10 +195,8 @@ public class OffsetStorageWriterTest {
     private void expectStore(final Callback<Void> callback,
                              final boolean fail,
                              final CountDownLatch waitForCompletion) {
-        EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY_SCHEMA, OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
-        EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
-        EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
-        EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
+        EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, OFFSET_KEY)).andReturn(OFFSET_KEY_SERIALIZED);
+        EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED);
 
         final Capture<Callback<Void>> storeCallback = Capture.newInstance();
         EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),


Mime
View raw message