kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [4/5] kafka git commit: KAFKA-2367; Add Copycat runtime data API.
Date Thu, 27 Aug 2015 18:58:54 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
new file mode 100644
index 0000000..d5458bc
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class FieldTest {
+
+    @Test
+    public void testEquality() {
+        Field field1 = new Field("name", 0, Schema.INT8_SCHEMA);
+        Field field2 = new Field("name", 0, Schema.INT8_SCHEMA);
+        Field differentName = new Field("name2", 0, Schema.INT8_SCHEMA);
+        Field differentIndex = new Field("name", 1, Schema.INT8_SCHEMA);
+        Field differentSchema = new Field("name", 0, Schema.INT16_SCHEMA);
+
+        assertEquals(field1, field2);
+        assertNotEquals(field1, differentName);
+        assertNotEquals(field1, differentIndex);
+        assertNotEquals(field1, differentSchema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
new file mode 100644
index 0000000..fca1c10
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.SchemaBuilderException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class SchemaBuilderTest {
+    private static final String NAME = "name";
+    private static final Integer VERSION = 2;
+    private static final String DOC = "doc";
+
+    @Test
+    public void testInt8Builder() {
+        Schema schema = SchemaBuilder.int8().build();
+        assertTypeAndDefault(schema, Schema.Type.INT8, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12);
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testInt8BuilderInvalidDefault() {
+        SchemaBuilder.int8().defaultValue("invalid");
+    }
+
+    @Test
+    public void testInt16Builder() {
+        Schema schema = SchemaBuilder.int16().build();
+        assertTypeAndDefault(schema, Schema.Type.INT16, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12);
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testInt16BuilderInvalidDefault() {
+        SchemaBuilder.int16().defaultValue("invalid");
+    }
+
+    @Test
+    public void testInt32Builder() {
+        Schema schema = SchemaBuilder.int32().build();
+        assertTypeAndDefault(schema, Schema.Type.INT32, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.INT32, true, 12);
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testInt32BuilderInvalidDefault() {
+        SchemaBuilder.int32().defaultValue("invalid");
+    }
+
+    @Test
+    public void testInt64Builder() {
+        Schema schema = SchemaBuilder.int64().build();
+        assertTypeAndDefault(schema, Schema.Type.INT64, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12);
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testInt64BuilderInvalidDefault() {
+        SchemaBuilder.int64().defaultValue("invalid");
+    }
+
+    @Test
+    public void testFloatBuilder() {
+        Schema schema = SchemaBuilder.float32().build();
+        assertTypeAndDefault(schema, Schema.Type.FLOAT32, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f);
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testFloatBuilderInvalidDefault() {
+        SchemaBuilder.float32().defaultValue("invalid");
+    }
+
+    @Test
+    public void testDoubleBuilder() {
+        Schema schema = SchemaBuilder.float64().build();
+        assertTypeAndDefault(schema, Schema.Type.FLOAT64, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0);
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testDoubleBuilderInvalidDefault() {
+        SchemaBuilder.float64().defaultValue("invalid");
+    }
+
+    @Test
+    public void testBooleanBuilder() {
+        Schema schema = SchemaBuilder.bool().build();
+        assertTypeAndDefault(schema, Schema.Type.BOOLEAN, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true);
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testBooleanBuilderInvalidDefault() {
+        SchemaBuilder.bool().defaultValue("invalid");
+    }
+
+    @Test
+    public void testStringBuilder() {
+        Schema schema = SchemaBuilder.string().build();
+        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a default string")
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default string");
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testStringBuilderInvalidDefault() {
+        SchemaBuilder.string().defaultValue(true);
+    }
+
+    @Test
+    public void testBytesBuilder() {
+        Schema schema = SchemaBuilder.bytes().build();
+        assertTypeAndDefault(schema, Schema.Type.BYTES, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a default byte array".getBytes())
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte array".getBytes());
+        assertMetadata(schema, NAME, VERSION, DOC);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testBytesBuilderInvalidDefault() {
+        SchemaBuilder.bytes().defaultValue("a string, not bytes");
+    }
+
+
+
+    @Test
+    public void testStructBuilder() {
+        Schema schema = SchemaBuilder.struct()
+                .field("field1", Schema.INT8_SCHEMA)
+                .field("field2", Schema.INT8_SCHEMA)
+                .build();
+        assertTypeAndDefault(schema, Schema.Type.STRUCT, false, null);
+        assertEquals(2, schema.fields().size());
+        assertEquals("field1", schema.fields().get(0).name());
+        assertEquals(0, schema.fields().get(0).index());
+        assertEquals(Schema.INT8_SCHEMA, schema.fields().get(0).schema());
+        assertEquals("field2", schema.fields().get(1).name());
+        assertEquals(1, schema.fields().get(1).index());
+        assertEquals(Schema.INT8_SCHEMA, schema.fields().get(1).schema());
+        assertNoMetadata(schema);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testNonStructCantHaveFields() {
+        SchemaBuilder.int8().field("field", SchemaBuilder.int8().build());
+    }
+
+
+    @Test
+    public void testArrayBuilder() {
+        Schema schema = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
+        assertTypeAndDefault(schema, Schema.Type.ARRAY, false, null);
+        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+        assertNoMetadata(schema);
+
+        // Default value
+        List<Byte> defArray = Arrays.asList((byte) 1, (byte) 2);
+        schema = SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(defArray).build();
+        assertTypeAndDefault(schema, Schema.Type.ARRAY, false, defArray);
+        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+        assertNoMetadata(schema);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testArrayBuilderInvalidDefault() {
+        // Array, but wrong embedded type
+        SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(Arrays.asList("string")).build();
+    }
+
+    @Test
+    public void testMapBuilder() {
+        Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA).build();
+        assertTypeAndDefault(schema, Schema.Type.MAP, false, null);
+        assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
+        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+        assertNoMetadata(schema);
+
+        // Default value
+        Map<Byte, Byte> defMap = Collections.singletonMap((byte) 5, (byte) 10);
+        schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
+                .defaultValue(defMap).build();
+        assertTypeAndDefault(schema, Schema.Type.MAP, false, defMap);
+        assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
+        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+        assertNoMetadata(schema);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testMapBuilderInvalidDefault() {
+        // Map, but wrong embedded type
+        Map<Byte, String> defMap = Collections.singletonMap((byte) 5, "foo");
+        SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
+                .defaultValue(defMap).build();
+    }
+
+
+
+    private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) {
+        assertEquals(type, schema.type());
+        assertEquals(optional, schema.isOptional());
+        if (type == Schema.Type.BYTES) {
+            // byte[] is not comparable, need to wrap to check correctly
+            if (defaultValue == null)
+                assertNull(schema.defaultValue());
+            else
+                assertEquals(ByteBuffer.wrap((byte[]) defaultValue), ByteBuffer.wrap((byte[]) schema.defaultValue()));
+        } else {
+            assertEquals(defaultValue, schema.defaultValue());
+        }
+    }
+
+    private void assertMetadata(Schema schema, String name, Integer version, String doc) {
+        assertEquals(name, schema.name());
+        assertEquals(version, schema.version());
+        assertEquals(doc, schema.doc());
+    }
+
+    private void assertNoMetadata(Schema schema) {
+        assertMetadata(schema, null, null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
new file mode 100644
index 0000000..162396b
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class StructTest {
+
+    private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
+            .field("int8", Schema.INT8_SCHEMA)
+            .field("int16", Schema.INT16_SCHEMA)
+            .field("int32", Schema.INT32_SCHEMA)
+            .field("int64", Schema.INT64_SCHEMA)
+            .field("float32", Schema.FLOAT32_SCHEMA)
+            .field("float64", Schema.FLOAT64_SCHEMA)
+            .field("boolean", Schema.BOOLEAN_SCHEMA)
+            .field("string", Schema.STRING_SCHEMA)
+            .field("bytes", Schema.BYTES_SCHEMA)
+            .build();
+
+    private static final Schema ARRAY_SCHEMA = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
+    private static final Schema MAP_SCHEMA = SchemaBuilder.map(
+            Schema.INT32_SCHEMA,
+            Schema.STRING_SCHEMA
+    ).build();
+    private static final Schema NESTED_CHILD_SCHEMA = SchemaBuilder.struct()
+            .field("int8", Schema.INT8_SCHEMA)
+            .build();
+    private static final Schema NESTED_SCHEMA = SchemaBuilder.struct()
+            .field("array", ARRAY_SCHEMA)
+            .field("map", MAP_SCHEMA)
+            .field("nested", NESTED_CHILD_SCHEMA)
+            .build();
+
+    private static final Schema REQUIRED_FIELD_SCHEMA = Schema.INT8_SCHEMA;
+    private static final Schema OPTIONAL_FIELD_SCHEMA = SchemaBuilder.int8().optional().build();
+    private static final Schema DEFAULT_FIELD_SCHEMA = SchemaBuilder.int8().defaultValue((byte) 0).build();
+
+    @Test
+    public void testFlatStruct() {
+        Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
+                .put("int8", (byte) 12)
+                .put("int16", (short) 12)
+                .put("int32", 12)
+                .put("int64", (long) 12)
+                .put("float32", 12.f)
+                .put("float64", 12.)
+                .put("boolean", true)
+                .put("string", "foobar")
+                .put("bytes", "foobar".getBytes());
+
+        // Test equality, and also the type-specific getters
+        assertEquals((byte) 12, (byte) struct.getInt8("int8"));
+        assertEquals((short) 12, (short) struct.getInt16("int16"));
+        assertEquals(12, (int) struct.getInt32("int32"));
+        assertEquals((long) 12, (long) struct.getInt64("int64"));
+        assertEquals((Float) 12.f, struct.getFloat32("float32"));
+        assertEquals((Double) 12., struct.getFloat64("float64"));
+        assertEquals(true, struct.getBoolean("boolean"));
+        assertEquals("foobar", struct.getString("string"));
+        assertEquals(ByteBuffer.wrap("foobar".getBytes()), ByteBuffer.wrap(struct.getBytes("bytes")));
+
+        struct.validate();
+    }
+
+    @Test
+    public void testComplexStruct() {
+        List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
+        Map<Integer, String> map = Collections.singletonMap(1, "string");
+        Struct struct = new Struct(NESTED_SCHEMA)
+                .put("array", array)
+                .put("map", map)
+                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+
+        // Separate the call to get the array and map to validate the typed get methods work properly
+        List<Byte> arrayExtracted = struct.getArray("array");
+        assertEquals(array, arrayExtracted);
+        Map<Byte, Byte> mapExtracted = struct.getMap("map");
+        assertEquals(map, mapExtracted);
+        assertEquals((byte) 12, struct.getStruct("nested").get("int8"));
+
+        struct.validate();
+    }
+
+
+    // These don't test all the ways validation can fail, just one for each element. See more extensive validation
+    // tests in SchemaTest. These are meant to ensure that we are invoking the same code path and that we do deeper
+    // inspection than just checking the class of the object
+
+    @Test(expected = DataException.class)
+    public void testInvalidFieldType() {
+        new Struct(FLAT_STRUCT_SCHEMA).put("int8", "should fail because this is a string, not int8");
+    }
+
+    @Test(expected = DataException.class)
+    public void testInvalidArrayFieldElements() {
+        new Struct(NESTED_SCHEMA).put("array", Arrays.asList("should fail since elements should be int8s"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testInvalidMapKeyElements() {
+        new Struct(NESTED_SCHEMA).put("map", Collections.singletonMap("should fail because keys should be int8s", (byte) 12));
+    }
+
+    @Test(expected = DataException.class)
+    public void testInvalidStructFieldSchema() {
+        new Struct(NESTED_SCHEMA).put("nested", new Struct(MAP_SCHEMA));
+    }
+
+    @Test(expected = DataException.class)
+    public void testInvalidStructFieldValue() {
+        new Struct(NESTED_SCHEMA).put("nested", new Struct(NESTED_CHILD_SCHEMA));
+    }
+
+
+    @Test(expected = DataException.class)
+    public void testMissingFieldValidation() {
+        // Required int8 field
+        Schema schema = SchemaBuilder.struct().field("field", REQUIRED_FIELD_SCHEMA).build();
+        Struct struct = new Struct(schema);
+        struct.validate();
+    }
+
+    @Test
+    public void testMissingOptionalFieldValidation() {
+        Schema schema = SchemaBuilder.struct().field("field", OPTIONAL_FIELD_SCHEMA).build();
+        Struct struct = new Struct(schema);
+        struct.validate();
+    }
+
+    @Test
+    public void testMissingFieldWithDefaultValidation() {
+        Schema schema = SchemaBuilder.struct().field("field", DEFAULT_FIELD_SCHEMA).build();
+        Struct struct = new Struct(schema);
+        struct.validate();
+    }
+
+
+    @Test
+    public void testEquals() {
+        Struct struct1 = new Struct(FLAT_STRUCT_SCHEMA)
+                .put("int8", (byte) 12)
+                .put("int16", (short) 12)
+                .put("int32", 12)
+                .put("int64", (long) 12)
+                .put("float32", 12.f)
+                .put("float64", 12.)
+                .put("boolean", true)
+                .put("string", "foobar")
+                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+        Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA)
+                .put("int8", (byte) 12)
+                .put("int16", (short) 12)
+                .put("int32", 12)
+                .put("int64", (long) 12)
+                .put("float32", 12.f)
+                .put("float64", 12.)
+                .put("boolean", true)
+                .put("string", "foobar")
+                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+        Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA)
+                .put("int8", (byte) 12)
+                .put("int16", (short) 12)
+                .put("int32", 12)
+                .put("int64", (long) 12)
+                .put("float32", 12.f)
+                .put("float64", 12.)
+                .put("boolean", true)
+                .put("string", "mismatching string")
+                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+
+        assertEquals(struct1, struct2);
+        assertNotEquals(struct1, struct3);
+
+        List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
+        Map<Integer, String> map = Collections.singletonMap(1, "string");
+        struct1 = new Struct(NESTED_SCHEMA)
+                .put("array", array)
+                .put("map", map)
+                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+        List<Byte> array2 = Arrays.asList((byte) 1, (byte) 2);
+        Map<Integer, String> map2 = Collections.singletonMap(1, "string");
+        struct2 = new Struct(NESTED_SCHEMA)
+                .put("array", array2)
+                .put("map", map2)
+                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+        List<Byte> array3 = Arrays.asList((byte) 1, (byte) 2, (byte) 3);
+        Map<Integer, String> map3 = Collections.singletonMap(2, "string");
+        struct3 = new Struct(NESTED_SCHEMA)
+                .put("array", array3)
+                .put("map", map3)
+                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 13));
+
+        assertEquals(struct1, struct2);
+        assertNotEquals(struct1, struct3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
deleted file mode 100644
index 855c0fd..0000000
--- a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-
-
-package org.apache.kafka.copycat.data;
-
-/** Base Avro exception. */
-public class DataRuntimeException extends RuntimeException {
-    public DataRuntimeException(Throwable cause) {
-        super(cause);
-    }
-
-    public DataRuntimeException(String message) {
-        super(message);
-    }
-
-    public DataRuntimeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
deleted file mode 100644
index 6a74d88..0000000
--- a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-
-
-package org.apache.kafka.copycat.data;
-
-
-/** Thrown when an illegal type is used. */
-public class DataTypeException extends DataRuntimeException {
-    public DataTypeException(String message) {
-        super(message);
-    }
-
-    public DataTypeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
deleted file mode 100644
index e995b7f..0000000
--- a/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-
-
-package org.apache.kafka.copycat.data;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Base class for objects that have Object-valued properties.
- */
-public abstract class ObjectProperties {
-    public static class Null {
-        private Null() {
-        }
-    }
-
-    /** A value representing a JSON <code>null</code>. */
-    public static final Null NULL_VALUE = new Null();
-
-    Map<String, Object> props = new LinkedHashMap<String, Object>(1);
-
-    private Set<String> reserved;
-
-    ObjectProperties(Set<String> reserved) {
-        this.reserved = reserved;
-    }
-
-    /**
-     * Returns the value of the named, string-valued property in this schema.
-     * Returns <tt>null</tt> if there is no string-valued property with that name.
-     */
-    public String getProp(String name) {
-        Object value = getObjectProp(name);
-        return (value instanceof String) ? (String) value : null;
-    }
-
-    /**
-     * Returns the value of the named property in this schema.
-     * Returns <tt>null</tt> if there is no property with that name.
-     */
-    public synchronized Object getObjectProp(String name) {
-        return props.get(name);
-    }
-
-    /**
-     * Adds a property with the given name <tt>name</tt> and
-     * value <tt>value</tt>. Neither <tt>name</tt> nor <tt>value</tt> can be
-     * <tt>null</tt>. It is illegal to add a property if another with
-     * the same name but different value already exists in this schema.
-     *
-     * @param name The name of the property to add
-     * @param value The value for the property to add
-     */
-    public synchronized void addProp(String name, Object value) {
-        if (reserved.contains(name))
-            throw new DataRuntimeException("Can't set reserved property: " + name);
-
-        if (value == null)
-            throw new DataRuntimeException("Can't set a property to null: " + name);
-
-        Object old = props.get(name);
-        if (old == null)
-            props.put(name, value);
-        else if (!old.equals(value))
-            throw new DataRuntimeException("Can't overwrite property: " + name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
deleted file mode 100644
index 04906c3..0000000
--- a/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
+++ /dev/null
@@ -1,1054 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-
-
-package org.apache.kafka.copycat.data;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-/** An abstract data type.
- * <p>A schema may be one of:
- * <ul>
- * <li>A <i>record</i>, mapping field names to field value data;
- * <li>An <i>enum</i>, containing one of a small set of symbols;
- * <li>An <i>array</i> of values, all of the same schema;
- * <li>A <i>map</i>, containing string/value pairs, of a declared schema;
- * <li>A <i>union</i> of other schemas;
- * <li>A <i>fixed</i> sized binary object;
- * <li>A unicode <i>string</i>;
- * <li>A sequence of <i>bytes</i>;
- * <li>A 32-bit signed <i>int</i>;
- * <li>A 64-bit signed <i>long</i>;
- * <li>A 32-bit IEEE single-<i>float</i>; or
- * <li>A 64-bit IEEE <i>double</i>-float; or
- * <li>A <i>boolean</i>; or
- * <li><i>null</i>.
- * </ul>
- *
- * A schema can be constructed using one of its static <tt>createXXX</tt>
- * methods, or more conveniently using {@link SchemaBuilder}. The schema objects are
- * <i>logically</i> immutable.
- * There are only two mutating methods - {@link #setFields(List)} and
- * {@link #addProp(String, Object)}. The following restrictions apply on these
- * two methods.
- * <ul>
- * <li> {@link #setFields(List)}, can be called at most once. This method exists
- * in order to enable clients to build recursive schemas.
- * <li> {@link #addProp(String, Object)} can be called with property names
- * that are not present already. It is not possible to change or delete an
- * existing property.
- * </ul>
- */
-public abstract class Schema extends ObjectProperties {
-    private static final int NO_HASHCODE = Integer.MIN_VALUE;
-
-    /** The type of a schema. */
-    public enum Type {
-        ENUM {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return null;
-            }
-        },
-        ARRAY {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return new ArrayList<>();
-            }
-        },
-        MAP {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return new HashMap<Object, Object>();
-            }
-        },
-        UNION {
-            @Override
-            public Object defaultValue(Schema schema) {
-                Schema firstSchema = schema.getTypes().get(0);
-                return firstSchema.getType().defaultValue(firstSchema);
-            }
-        },
-        STRING {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return "";
-            }
-        },
-        BYTES {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return new byte[0];
-            }
-        },
-        INT {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return 0;
-            }
-        },
-        LONG {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return 0;
-            }
-        },
-        FLOAT {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return 0;
-            }
-        },
-        DOUBLE {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return 0;
-            }
-        },
-        BOOLEAN {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return false;
-            }
-        },
-        NULL {
-            @Override
-            public Object defaultValue(Schema schema) {
-                return null;
-            }
-        };
-        private String name;
-
-        private Type() {
-            this.name = this.name().toLowerCase();
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public abstract Object defaultValue(Schema schema);
-    }
-
-    private final Type type;
-
-    Schema(Type type) {
-        super(SCHEMA_RESERVED);
-        this.type = type;
-    }
-
-    /** Create a schema for a primitive type. */
-    public static Schema create(Type type) {
-        switch (type) {
-            case STRING:
-                return new StringSchema();
-            case BYTES:
-                return new BytesSchema();
-            case INT:
-                return new IntSchema();
-            case LONG:
-                return new LongSchema();
-            case FLOAT:
-                return new FloatSchema();
-            case DOUBLE:
-                return new DoubleSchema();
-            case BOOLEAN:
-                return new BooleanSchema();
-            case NULL:
-                return new NullSchema();
-            default:
-                throw new DataRuntimeException("Can't create a: " + type);
-        }
-    }
-
-    private static final Set<String> SCHEMA_RESERVED = new HashSet<String>();
-
-    static {
-        Collections.addAll(SCHEMA_RESERVED,
-                "doc", "fields", "items", "name", "namespace",
-                "size", "symbols", "values", "type", "aliases");
-    }
-
-    int hashCode = NO_HASHCODE;
-
-    @Override
-    public void addProp(String name, Object value) {
-        super.addProp(name, value);
-        hashCode = NO_HASHCODE;
-    }
-
-    /** Create an enum schema. */
-    public static Schema createEnum(String name, String doc, String namespace,
-                                    List<String> values) {
-        return new EnumSchema(new Name(name, namespace), doc,
-                new LockableArrayList<String>(values));
-    }
-
-    /** Create an array schema. */
-    public static Schema createArray(Schema elementType) {
-        return new ArraySchema(elementType);
-    }
-
-    /** Create a map schema. */
-    public static Schema createMap(Schema valueType) {
-        return new MapSchema(valueType);
-    }
-
-    /** Create a union schema. */
-    public static Schema createUnion(List<Schema> types) {
-        return new UnionSchema(new LockableArrayList<Schema>(types));
-    }
-
-    /** Create a union schema. */
-    public static Schema createUnion(Schema... types) {
-        return createUnion(new LockableArrayList<Schema>(types));
-    }
-
-    /** Return the type of this schema. */
-    public Type getType() {
-        return type;
-    }
-
-    /**
-     * If this is a record, returns the Field with the
-     * given name <tt>fieldName</tt>. If there is no field by that name, a
-     * <tt>null</tt> is returned.
-     */
-    public Field getField(String fieldname) {
-        throw new DataRuntimeException("Not a record: " + this);
-    }
-
-    /**
-     * If this is a record, returns the fields in it. The returned
-     * list is in the order of their positions.
-     */
-    public List<Field> getFields() {
-        throw new DataRuntimeException("Not a record: " + this);
-    }
-
-    /**
-     * If this is a record, set its fields. The fields can be set
-     * only once in a schema.
-     */
-    public void setFields(List<Field> fields) {
-        throw new DataRuntimeException("Not a record: " + this);
-    }
-
-    /** If this is an enum, return its symbols. */
-    public List<String> getEnumSymbols() {
-        throw new DataRuntimeException("Not an enum: " + this);
-    }
-
-    /** If this is an enum, return a symbol's ordinal value. */
-    public int getEnumOrdinal(String symbol) {
-        throw new DataRuntimeException("Not an enum: " + this);
-    }
-
-    /** If this is an enum, returns true if it contains given symbol. */
-    public boolean hasEnumSymbol(String symbol) {
-        throw new DataRuntimeException("Not an enum: " + this);
-    }
-
-    /** If this is a record, enum or fixed, returns its name, otherwise the name
-     * of the primitive type. */
-    public String getName() {
-        return type.name;
-    }
-
-    /** If this is a record, enum, or fixed, returns its docstring,
-     * if available.  Otherwise, returns null. */
-    public String getDoc() {
-        return null;
-    }
-
-    /** If this is a record, enum or fixed, returns its namespace, if any. */
-    public String getNamespace() {
-        throw new DataRuntimeException("Not a named type: " + this);
-    }
-
-    /** If this is a record, enum or fixed, returns its namespace-qualified name,
-     * otherwise returns the name of the primitive type. */
-    public String getFullName() {
-        return getName();
-    }
-
-    /** If this is a record, enum or fixed, add an alias. */
-    public void addAlias(String alias) {
-        throw new DataRuntimeException("Not a named type: " + this);
-    }
-
-    /** If this is a record, enum or fixed, add an alias. */
-    public void addAlias(String alias, String space) {
-        throw new DataRuntimeException("Not a named type: " + this);
-    }
-
-    /** If this is a record, enum or fixed, return its aliases, if any. */
-    public Set<String> getAliases() {
-        throw new DataRuntimeException("Not a named type: " + this);
-    }
-
-    /** Returns true if this record is an error type. */
-    public boolean isError() {
-        throw new DataRuntimeException("Not a record: " + this);
-    }
-
-    /** If this is an array, returns its element type. */
-    public Schema getElementType() {
-        throw new DataRuntimeException("Not an array: " + this);
-    }
-
-    /** If this is a map, returns its value type. */
-    public Schema getValueType() {
-        throw new DataRuntimeException("Not a map: " + this);
-    }
-
-    /** If this is a union, returns its types. */
-    public List<Schema> getTypes() {
-        throw new DataRuntimeException("Not a union: " + this);
-    }
-
-    /** If this is a union, return the branch with the provided full name. */
-    public Integer getIndexNamed(String name) {
-        throw new DataRuntimeException("Not a union: " + this);
-    }
-
-    /** If this is fixed, returns its size. */
-    public int getFixedSize() {
-        throw new DataRuntimeException("Not fixed: " + this);
-    }
-
-    @Override
-    public String toString() {
-        // FIXME A more JSON-like output showing the details would be nice
-        return "Schema:" + this.getType() + ":" + getFullName();
-    }
-
-    public boolean equals(Object o) {
-        if (o == this) return true;
-        if (!(o instanceof Schema)) return false;
-        Schema that = (Schema) o;
-        if (!(this.type == that.type)) return false;
-        return equalCachedHash(that) && props.equals(that.props);
-    }
-
-    public final int hashCode() {
-        if (hashCode == NO_HASHCODE)
-            hashCode = computeHash();
-        return hashCode;
-    }
-
-    int computeHash() {
-        return getType().hashCode() + props.hashCode();
-    }
-
-    final boolean equalCachedHash(Schema other) {
-        return (hashCode == other.hashCode)
-                || (hashCode == NO_HASHCODE)
-                || (other.hashCode == NO_HASHCODE);
-    }
-
-    private static final Set<String> FIELD_RESERVED = new HashSet<String>();
-
-    static {
-        Collections.addAll(FIELD_RESERVED,
-                "default", "doc", "name", "order", "type", "aliases");
-    }
-
-    /** A field within a record. */
-    public static class Field extends ObjectProperties {
-
-        /** How values of this field should be ordered when sorting records. */
-        public enum Order {
-            ASCENDING, DESCENDING, IGNORE;
-            private String name;
-
-            private Order() {
-                this.name = this.name().toLowerCase();
-            }
-        }
-
-
-        private final String name;    // name of the field.
-        private int position = -1;
-        private final Schema schema;
-        private final String doc;
-        private final Object defaultValue;
-        private final Order order;
-        private Set<String> aliases;
-
-        public Field(String name, Schema schema, String doc,
-                     Object defaultValue) {
-            this(name, schema, doc, defaultValue, Order.ASCENDING);
-        }
-
-        public Field(String name, Schema schema, String doc,
-                     Object defaultValue, Order order) {
-            super(FIELD_RESERVED);
-            this.name = validateName(name);
-            this.schema = schema;
-            this.doc = doc;
-            this.defaultValue = validateDefault(name, schema, defaultValue);
-            this.order = order;
-        }
-
-        public String name() {
-            return name;
-        }
-
-
-        /** The position of this field within the record. */
-        public int pos() {
-            return position;
-        }
-
-        /** This field's {@link Schema}. */
-        public Schema schema() {
-            return schema;
-        }
-
-        /** Field's documentation within the record, if set. May return null. */
-        public String doc() {
-            return doc;
-        }
-
-        public Object defaultValue() {
-            return defaultValue;
-        }
-
-        public Order order() {
-            return order;
-        }
-
-        public void addAlias(String alias) {
-            if (aliases == null)
-                this.aliases = new LinkedHashSet<String>();
-            aliases.add(alias);
-        }
-
-        /** Return the defined aliases as an unmodifieable Set. */
-        public Set<String> aliases() {
-            if (aliases == null)
-                return Collections.emptySet();
-            return Collections.unmodifiableSet(aliases);
-        }
-
-        public boolean equals(Object other) {
-            if (other == this) return true;
-            if (!(other instanceof Field)) return false;
-            Field that = (Field) other;
-            return (name.equals(that.name)) &&
-                    (schema.equals(that.schema)) &&
-                    defaultValueEquals(that.defaultValue) &&
-                    (order == that.order) &&
-                    props.equals(that.props);
-        }
-
-        public int hashCode() {
-            return name.hashCode() + schema.computeHash();
-        }
-
-        /** Do any possible implicit conversions to double, or return 0 if there isn't a
-         * valid conversion */
-        private double doubleValue(Object v) {
-            if (v instanceof Integer)
-                return (double) (Integer) v;
-            else if (v instanceof Long)
-                return (double) (Long) v;
-            else if (v instanceof Float)
-                return (double) (Float) v;
-            else if (v instanceof Double)
-                return (double) (Double) v;
-            else
-                return 0;
-        }
-
-        private boolean defaultValueEquals(Object thatDefaultValue) {
-            if (defaultValue == null)
-                return thatDefaultValue == null;
-            if (Double.isNaN(doubleValue(defaultValue)))
-                return Double.isNaN(doubleValue(thatDefaultValue));
-            return defaultValue.equals(thatDefaultValue);
-        }
-
-        @Override
-        public String toString() {
-            return name + " type:" + schema.type + " pos:" + position;
-        }
-    }
-
-    static class Name {
-        private final String name;
-        private final String space;
-        private final String full;
-
-        public Name(String name, String space) {
-            if (name == null) {                         // anonymous
-                this.name = this.space = this.full = null;
-                return;
-            }
-            int lastDot = name.lastIndexOf('.');
-            if (lastDot < 0) {                          // unqualified name
-                this.name = validateName(name);
-            } else {                                    // qualified name
-                space = name.substring(0, lastDot);       // get space from name
-                this.name = validateName(name.substring(lastDot + 1, name.length()));
-            }
-            if ("".equals(space))
-                space = null;
-            this.space = space;
-            this.full = (this.space == null) ? this.name : this.space + "." + this.name;
-        }
-
-        public boolean equals(Object o) {
-            if (o == this) return true;
-            if (!(o instanceof Name)) return false;
-            Name that = (Name) o;
-            return full == null ? that.full == null : full.equals(that.full);
-        }
-
-        public int hashCode() {
-            return full == null ? 0 : full.hashCode();
-        }
-
-        public String toString() {
-            return full;
-        }
-
-        public String getQualified(String defaultSpace) {
-            return (space == null || space.equals(defaultSpace)) ? name : full;
-        }
-    }
-
-    private static abstract class NamedSchema extends Schema {
-        final Name name;
-        final String doc;
-        Set<Name> aliases;
-
-        public NamedSchema(Type type, Name name, String doc) {
-            super(type);
-            this.name = name;
-            this.doc = doc;
-            if (PRIMITIVES.containsKey(name.full)) {
-                throw new DataTypeException("Schemas may not be named after primitives: " + name.full);
-            }
-        }
-
-        public String getName() {
-            return name.name;
-        }
-
-        public String getDoc() {
-            return doc;
-        }
-
-        public String getNamespace() {
-            return name.space;
-        }
-
-        public String getFullName() {
-            return name.full;
-        }
-
-        public void addAlias(String alias) {
-            addAlias(alias, null);
-        }
-
-        public void addAlias(String name, String space) {
-            if (aliases == null)
-                this.aliases = new LinkedHashSet<Name>();
-            if (space == null)
-                space = this.name.space;
-            aliases.add(new Name(name, space));
-        }
-
-        public Set<String> getAliases() {
-            Set<String> result = new LinkedHashSet<String>();
-            if (aliases != null)
-                for (Name alias : aliases)
-                    result.add(alias.full);
-            return result;
-        }
-
-        public boolean equalNames(NamedSchema that) {
-            return this.name.equals(that.name);
-        }
-
-        @Override
-        int computeHash() {
-            return super.computeHash() + name.hashCode();
-        }
-    }
-
-    private static class SeenPair {
-        private Object s1;
-        private Object s2;
-
-        private SeenPair(Object s1, Object s2) {
-            this.s1 = s1;
-            this.s2 = s2;
-        }
-
-        public boolean equals(Object o) {
-            return this.s1 == ((SeenPair) o).s1 && this.s2 == ((SeenPair) o).s2;
-        }
-
-        public int hashCode() {
-            return System.identityHashCode(s1) + System.identityHashCode(s2);
-        }
-    }
-
-    private static final ThreadLocal<Set> SEEN_EQUALS = new ThreadLocal<Set>() {
-        protected Set initialValue() {
-            return new HashSet();
-        }
-    };
-    private static final ThreadLocal<Map> SEEN_HASHCODE = new ThreadLocal<Map>() {
-        protected Map initialValue() {
-            return new IdentityHashMap();
-        }
-    };
-
-    private static class EnumSchema extends NamedSchema {
-        private final List<String> symbols;
-        private final Map<String, Integer> ordinals;
-
-        public EnumSchema(Name name, String doc,
-                          LockableArrayList<String> symbols) {
-            super(Type.ENUM, name, doc);
-            this.symbols = symbols.lock();
-            this.ordinals = new HashMap<String, Integer>();
-            int i = 0;
-            for (String symbol : symbols)
-                if (ordinals.put(validateName(symbol), i++) != null)
-                    throw new SchemaParseException("Duplicate enum symbol: " + symbol);
-        }
-
-        public List<String> getEnumSymbols() {
-            return symbols;
-        }
-
-        public boolean hasEnumSymbol(String symbol) {
-            return ordinals.containsKey(symbol);
-        }
-
-        public int getEnumOrdinal(String symbol) {
-            return ordinals.get(symbol);
-        }
-
-        public boolean equals(Object o) {
-            if (o == this) return true;
-            if (!(o instanceof EnumSchema)) return false;
-            EnumSchema that = (EnumSchema) o;
-            return equalCachedHash(that)
-                    && equalNames(that)
-                    && symbols.equals(that.symbols)
-                    && props.equals(that.props);
-        }
-
-        @Override
-        int computeHash() {
-            return super.computeHash() + symbols.hashCode();
-        }
-    }
-
-    private static class ArraySchema extends Schema {
-        private final Schema elementType;
-
-        public ArraySchema(Schema elementType) {
-            super(Type.ARRAY);
-            this.elementType = elementType;
-        }
-
-        public Schema getElementType() {
-            return elementType;
-        }
-
-        public boolean equals(Object o) {
-            if (o == this) return true;
-            if (!(o instanceof ArraySchema)) return false;
-            ArraySchema that = (ArraySchema) o;
-            return equalCachedHash(that)
-                    && elementType.equals(that.elementType)
-                    && props.equals(that.props);
-        }
-
-        @Override
-        int computeHash() {
-            return super.computeHash() + elementType.computeHash();
-        }
-    }
-
-    private static class MapSchema extends Schema {
-        private final Schema valueType;
-
-        public MapSchema(Schema valueType) {
-            super(Type.MAP);
-            this.valueType = valueType;
-        }
-
-        public Schema getValueType() {
-            return valueType;
-        }
-
-        public boolean equals(Object o) {
-            if (o == this) return true;
-            if (!(o instanceof MapSchema)) return false;
-            MapSchema that = (MapSchema) o;
-            return equalCachedHash(that)
-                    && valueType.equals(that.valueType)
-                    && props.equals(that.props);
-        }
-
-        @Override
-        int computeHash() {
-            return super.computeHash() + valueType.computeHash();
-        }
-    }
-
-    private static class UnionSchema extends Schema {
-        private final List<Schema> types;
-        private final Map<String, Integer> indexByName
-                = new HashMap<String, Integer>();
-
-        public UnionSchema(LockableArrayList<Schema> types) {
-            super(Type.UNION);
-            this.types = types.lock();
-            int index = 0;
-            for (Schema type : types) {
-                if (type.getType() == Type.UNION)
-                    throw new DataRuntimeException("Nested union: " + this);
-                String name = type.getFullName();
-                if (name == null)
-                    throw new DataRuntimeException("Nameless in union:" + this);
-                if (indexByName.put(name, index++) != null)
-                    throw new DataRuntimeException("Duplicate in union:" + name);
-            }
-        }
-
-        public List<Schema> getTypes() {
-            return types;
-        }
-
-        public Integer getIndexNamed(String name) {
-            return indexByName.get(name);
-        }
-
-        public boolean equals(Object o) {
-            if (o == this) return true;
-            if (!(o instanceof UnionSchema)) return false;
-            UnionSchema that = (UnionSchema) o;
-            return equalCachedHash(that)
-                    && types.equals(that.types)
-                    && props.equals(that.props);
-        }
-
-        @Override
-        int computeHash() {
-            int hash = super.computeHash();
-            for (Schema type : types)
-                hash += type.computeHash();
-            return hash;
-        }
-    }
-
-    private static class StringSchema extends Schema {
-        public StringSchema() {
-            super(Type.STRING);
-        }
-    }
-
-    private static class BytesSchema extends Schema {
-        public BytesSchema() {
-            super(Type.BYTES);
-        }
-    }
-
-    private static class IntSchema extends Schema {
-        public IntSchema() {
-            super(Type.INT);
-        }
-    }
-
-    private static class LongSchema extends Schema {
-        public LongSchema() {
-            super(Type.LONG);
-        }
-    }
-
-    private static class FloatSchema extends Schema {
-        public FloatSchema() {
-            super(Type.FLOAT);
-        }
-    }
-
-    private static class DoubleSchema extends Schema {
-        public DoubleSchema() {
-            super(Type.DOUBLE);
-        }
-    }
-
-    private static class BooleanSchema extends Schema {
-        public BooleanSchema() {
-            super(Type.BOOLEAN);
-        }
-    }
-
-    private static class NullSchema extends Schema {
-        public NullSchema() {
-            super(Type.NULL);
-        }
-    }
-
-    static final Map<String, Type> PRIMITIVES = new HashMap<String, Type>();
-
-    static {
-        PRIMITIVES.put("string", Type.STRING);
-        PRIMITIVES.put("bytes", Type.BYTES);
-        PRIMITIVES.put("int", Type.INT);
-        PRIMITIVES.put("long", Type.LONG);
-        PRIMITIVES.put("float", Type.FLOAT);
-        PRIMITIVES.put("double", Type.DOUBLE);
-        PRIMITIVES.put("boolean", Type.BOOLEAN);
-        PRIMITIVES.put("null", Type.NULL);
-    }
-
-    static class Names extends LinkedHashMap<Name, Schema> {
-        private String space;                         // default namespace
-
-        public Names() {
-        }
-
-        public Names(String space) {
-            this.space = space;
-        }
-
-        public String space() {
-            return space;
-        }
-
-        public void space(String space) {
-            this.space = space;
-        }
-
-        @Override
-        public Schema get(Object o) {
-            Name name;
-            if (o instanceof String) {
-                Type primitive = PRIMITIVES.get((String) o);
-                if (primitive != null) return Schema.create(primitive);
-                name = new Name((String) o, space);
-                if (!containsKey(name))                   // if not in default
-                    name = new Name((String) o, "");         // try anonymous
-            } else {
-                name = (Name) o;
-            }
-            return super.get(name);
-        }
-
-        public boolean contains(Schema schema) {
-            return get(((NamedSchema) schema).name) != null;
-        }
-
-        public void add(Schema schema) {
-            put(((NamedSchema) schema).name, schema);
-        }
-
-        @Override
-        public Schema put(Name name, Schema schema) {
-            if (containsKey(name))
-                throw new SchemaParseException("Can't redefine: " + name);
-            return super.put(name, schema);
-        }
-    }
-
-    private static ThreadLocal<Boolean> validateNames
-            = new ThreadLocal<Boolean>() {
-        @Override
-        protected Boolean initialValue() {
-            return true;
-        }
-    };
-
-    private static String validateName(String name) {
-        if (!validateNames.get()) return name;        // not validating names
-        int length = name.length();
-        if (length == 0)
-            throw new SchemaParseException("Empty name");
-        char first = name.charAt(0);
-        if (!(Character.isLetter(first) || first == '_'))
-            throw new SchemaParseException("Illegal initial character: " + name);
-        for (int i = 1; i < length; i++) {
-            char c = name.charAt(i);
-            if (!(Character.isLetterOrDigit(c) || c == '_'))
-                throw new SchemaParseException("Illegal character in: " + name);
-        }
-        return name;
-    }
-
-    private static final ThreadLocal<Boolean> VALIDATE_DEFAULTS
-            = new ThreadLocal<Boolean>() {
-        @Override
-        protected Boolean initialValue() {
-            return false;
-        }
-    };
-
-    private static Object validateDefault(String fieldName, Schema schema,
-                                          Object defaultValue) {
-        if ((defaultValue != null)
-                && !isValidDefault(schema, defaultValue)) { // invalid default
-            String message = "Invalid default for field " + fieldName
-                    + ": " + defaultValue + " not a " + schema;
-            if (VALIDATE_DEFAULTS.get())
-                throw new DataTypeException(message);     // throw exception
-            System.err.println("[WARNING] Avro: " + message); // or log warning
-        }
-        return defaultValue;
-    }
-
-    private static boolean isValidDefault(Schema schema, Object defaultValue) {
-        switch (schema.getType()) {
-            case STRING:
-            case ENUM:
-                return (defaultValue instanceof String);
-            case BYTES:
-            case INT:
-                return (defaultValue instanceof Integer);
-            case LONG:
-                return (defaultValue instanceof Long);
-            case FLOAT:
-                return (defaultValue instanceof Float);
-            case DOUBLE:
-                return (defaultValue instanceof Double);
-            case BOOLEAN:
-                return (defaultValue instanceof Boolean);
-            case NULL:
-                return defaultValue == null;
-            case ARRAY:
-                if (!(defaultValue instanceof Collection))
-                    return false;
-                for (Object element : (Collection<Object>) defaultValue)
-                    if (!isValidDefault(schema.getElementType(), element))
-                        return false;
-                return true;
-            case MAP:
-                if (!(defaultValue instanceof Map))
-                    return false;
-                for (Object value : ((Map<Object, Object>) defaultValue).values())
-                    if (!isValidDefault(schema.getValueType(), value))
-                        return false;
-                return true;
-            case UNION:                                   // union default: first branch
-                return isValidDefault(schema.getTypes().get(0), defaultValue);
-            default:
-                return false;
-        }
-    }
-
-    /**
-     * No change is permitted on LockableArrayList once lock() has been
-     * called on it.
-     * @param <E>
-     */
-  
-  /*
-   * This class keeps a boolean variable <tt>locked</tt> which is set
-   * to <tt>true</tt> in the lock() method. It's legal to call
-   * lock() any number of times. Any lock() other than the first one
-   * is a no-op.
-   * 
-   * This class throws <tt>IllegalStateException</tt> if a mutating
-   * operation is performed after being locked. Since modifications through
-   * iterator also use the list's mutating operations, this effectively
-   * blocks all modifications.
-   */
-    static class LockableArrayList<E> extends ArrayList<E> {
-        private static final long serialVersionUID = 1L;
-        private boolean locked = false;
-
-        public LockableArrayList() {
-        }
-
-        public LockableArrayList(int size) {
-            super(size);
-        }
-
-        public LockableArrayList(List<E> types) {
-            super(types);
-        }
-
-        public LockableArrayList(E... types) {
-            super(types.length);
-            Collections.addAll(this, types);
-        }
-
-        public List<E> lock() {
-            locked = true;
-            return this;
-        }
-
-        private void ensureUnlocked() {
-            if (locked) {
-                throw new IllegalStateException();
-            }
-        }
-
-        public boolean add(E e) {
-            ensureUnlocked();
-            return super.add(e);
-        }
-
-        public boolean remove(Object o) {
-            ensureUnlocked();
-            return super.remove(o);
-        }
-
-        public E remove(int index) {
-            ensureUnlocked();
-            return super.remove(index);
-        }
-
-        public boolean addAll(Collection<? extends E> c) {
-            ensureUnlocked();
-            return super.addAll(c);
-        }
-
-        public boolean addAll(int index, Collection<? extends E> c) {
-            ensureUnlocked();
-            return super.addAll(index, c);
-        }
-
-        public boolean removeAll(Collection<?> c) {
-            ensureUnlocked();
-            return super.removeAll(c);
-        }
-
-        public boolean retainAll(Collection<?> c) {
-            ensureUnlocked();
-            return super.retainAll(c);
-        }
-
-        public void clear() {
-            ensureUnlocked();
-            super.clear();
-        }
-
-    }
-
-}


Mime
View raw message