kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2484: Add schema projection utilities
Date Fri, 16 Oct 2015 22:44:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c553249b4 -> ef65d0a36


KAFKA-2484: Add schema projection utilities

This PR adds schema projection utilities to copycat.

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava

Closes #307 from Ishiihara/schema-projection


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef65d0a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef65d0a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef65d0a3

Branch: refs/heads/trunk
Commit: ef65d0a360ed65180ddd84d17679f24671c66e14
Parents: c553249
Author: Liquan Pei <liquanpei@gmail.com>
Authored: Fri Oct 16 15:44:31 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Oct 16 15:44:31 2015 -0700

----------------------------------------------------------------------
 .../kafka/copycat/data/SchemaProjector.java     | 197 ++++++++
 .../errors/SchemaProjectorException.java        |  29 ++
 .../kafka/copycat/data/SchemaProjectorTest.java | 495 +++++++++++++++++++
 3 files changed, 721 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef65d0a3/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
new file mode 100644
index 0000000..3ab9e7f
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.data.Schema.Type;
+import org.apache.kafka.copycat.errors.SchemaProjectorException;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * <p>
+ *     SchemaProjector is utility to project a value between compatible schemas and throw
exceptions
+ *     when non compatible schemas are provided.
+ * </p>
+ */
+
+public class SchemaProjector {
+
+    private static Set<AbstractMap.SimpleImmutableEntry<Type, Type>> promotable
= new HashSet<>();
+
+    static {
+        Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, Type.INT64, Type.FLOAT32,
Type.FLOAT64};
+        for (int i = 0; i < promotableTypes.length; ++i) {
+            for (int j = i; j < promotableTypes.length; ++j) {
+                promotable.add(new AbstractMap.SimpleImmutableEntry<>(promotableTypes[i],
promotableTypes[j]));
+            }
+        }
+    }
+
+    /**
+     * This method project a value between compatible schemas and throw exceptions when non
compatible schemas are provided
+     * @param source the schema used to construct the record
+     * @param record the value to project from source schema to target schema
+     * @param target the schema to project the record to
+     * @return the projected value with target schema
+     * @throws SchemaProjectorException
+     */
+    public static Object project(Schema source, Object record, Schema target) throws SchemaProjectorException
{
+        checkMaybeCompatible(source, target);
+        if (source.isOptional() && !target.isOptional()) {
+            if (target.defaultValue() != null) {
+                if (record != null) {
+                    return projectRequiredSchema(source, record, target);
+                } else {
+                    return target.defaultValue();
+                }
+            } else {
+                throw new SchemaProjectorException("Writer schema is optional, however, target
schema does not provide a default value.");
+            }
+        } else {
+            if (record != null) {
+                return projectRequiredSchema(source, record, target);
+            } else {
+                return null;
+            }
+        }
+    }
+
+    private static Object projectRequiredSchema(Schema source, Object record, Schema target)
throws SchemaProjectorException {
+        switch (target.type()) {
+            case INT8:
+            case INT16:
+            case INT32:
+            case INT64:
+            case FLOAT32:
+            case FLOAT64:
+            case BOOLEAN:
+            case BYTES:
+            case STRING:
+                return projectPrimitive(source, record, target);
+            case STRUCT:
+                return projectStruct(source, (Struct) record, target);
+            case ARRAY:
+                return projectArray(source, record, target);
+            case MAP:
+                return projectMap(source, record, target);
+        }
+        return null;
+    }
+
+    private static Object projectStruct(Schema source, Struct sourceStruct, Schema target)
throws SchemaProjectorException {
+        Struct targetStruct = new Struct(target);
+        for (Field targetField : target.fields()) {
+            String fieldName = targetField.name();
+            Field sourceField = source.field(fieldName);
+            if (sourceField != null) {
+                Object sourceFieldValue = sourceStruct.get(fieldName);
+                try {
+                    Object targetFieldValue = project(sourceField.schema(), sourceFieldValue,
targetField.schema());
+                    targetStruct.put(fieldName, targetFieldValue);
+                } catch (SchemaProjectorException e) {
+                    throw new SchemaProjectorException("Error projecting " + sourceField.name(),
e);
+                }
+            } else {
+                Object targetDefault;
+                if (targetField.schema().defaultValue() != null) {
+                    targetDefault = targetField.schema().defaultValue();
+                } else {
+                    throw new SchemaProjectorException("Cannot project " + source.schema()
+ " to " + target.schema());
+                }
+                targetStruct.put(fieldName, targetDefault);
+            }
+        }
+        return targetStruct;
+    }
+
+
+    private static void checkMaybeCompatible(Schema source, Schema target) {
+        if (source.type() != target.type() && !isPromotable(source.type(), target.type()))
{
+            throw new SchemaProjectorException("Schema type mismatch. source type: " + source.type()
+ " and target type: " + target.type());
+        } else if (!Objects.equals(source.name(), target.name())) {
+            throw new SchemaProjectorException("Schema name mismatch. source name: " + source.name()
+ " and target name: " + target.name());
+        } else if (!Objects.equals(source.parameters(), target.parameters())) {
+            throw new SchemaProjectorException("Schema parameters not equal. source parameters:
" + source.parameters() + " and target parameters: " + target.parameters());
+        }
+    }
+
+    private static Object projectArray(Schema source, Object record, Schema target) throws
SchemaProjectorException {
+        List<?> array = (List<?>) record;
+        List<Object> retArray = new ArrayList<>();
+        for (Object entry : array) {
+            retArray.add(project(source.valueSchema(), entry, target.valueSchema()));
+        }
+        return retArray;
+    }
+
+    private static Object projectMap(Schema source, Object record, Schema target) throws
SchemaProjectorException {
+        Map<?, ?> map = (Map<?, ?>) record;
+        Map<Object, Object> retMap = new HashMap<>();
+        for (Map.Entry<?, ?> entry : map.entrySet()) {
+            Object key = entry.getKey();
+            Object value = entry.getValue();
+            Object retKey = project(source.keySchema(), key, target.keySchema());
+            Object retValue = project(source.valueSchema(), value, target.valueSchema());
+            retMap.put(retKey, retValue);
+        }
+        return retMap;
+    }
+
+    private static Object projectPrimitive(Schema source, Object record, Schema target) throws
SchemaProjectorException {
+        assert source.type().isPrimitive();
+        assert target.type().isPrimitive();
+        Object result;
+        if (isPromotable(source.type(), target.type())) {
+            Number numberRecord = (Number) record;
+            switch (target.type()) {
+                case INT8:
+                    result = numberRecord.byteValue();
+                    break;
+                case INT16:
+                    result = numberRecord.shortValue();
+                    break;
+                case INT32:
+                    result = numberRecord.intValue();
+                    break;
+                case INT64:
+                    result = numberRecord.longValue();
+                    break;
+                case FLOAT32:
+                    result = numberRecord.floatValue();
+                    break;
+                case FLOAT64:
+                    result = numberRecord.doubleValue();
+                    break;
+                default:
+                    throw new SchemaProjectorException("Not promotable type.");
+            }
+        } else {
+            result = record;
+        }
+        return result;
+    }
+
+    private static boolean isPromotable(Type sourceType, Type targetType) {
+        return promotable.contains(new AbstractMap.SimpleImmutableEntry<>(sourceType,
targetType));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef65d0a3/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
new file mode 100644
index 0000000..be21418
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ **/
+package org.apache.kafka.copycat.errors;
+
+public class SchemaProjectorException extends DataException {
+    public SchemaProjectorException(String s) {
+        super(s);
+    }
+
+    public SchemaProjectorException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public SchemaProjectorException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef65d0a3/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java
new file mode 100644
index 0000000..31a6f79
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.data.Schema.Type;
+import org.apache.kafka.copycat.errors.DataException;
+import org.apache.kafka.copycat.errors.SchemaProjectorException;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class SchemaProjectorTest {
+
+    @Test
+    public void testPrimitiveTypeProjection() throws Exception {
+        Object projected;
+        projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA);
+        assertEquals(false, projected);
+
+        byte[] bytes = {(byte) 1, (byte) 2};
+        projected  = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA);
+        assertEquals(bytes, projected);
+
+        projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.STRING_SCHEMA);
+        assertEquals("abc", projected);
+
+        projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.OPTIONAL_BOOLEAN_SCHEMA);
+        assertEquals(false, projected);
+
+        projected  = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.OPTIONAL_BYTES_SCHEMA);
+        assertEquals(bytes, projected);
+
+        projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.OPTIONAL_STRING_SCHEMA);
+        assertEquals("abc", projected);
+
+        try {
+            SchemaProjector.project(Schema.OPTIONAL_BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA);
+            fail("Cannot project optional schema to schema with no default value.");
+        } catch (DataException e) {
+            // expected
+        }
+
+        try {
+            SchemaProjector.project(Schema.OPTIONAL_BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA);
+            fail("Cannot project optional schema to schema with no default value.");
+        } catch (DataException e) {
+            // expected
+        }
+
+        try {
+            SchemaProjector.project(Schema.OPTIONAL_STRING_SCHEMA, "abc", Schema.STRING_SCHEMA);
+            fail("Cannot project optional schema to schema with no default value.");
+        } catch (DataException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testNumericTypeProjection() throws Exception {
+        Schema[] promotableSchemas = {Schema.INT8_SCHEMA, Schema.INT16_SCHEMA, Schema.INT32_SCHEMA,
Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA, Schema.FLOAT64_SCHEMA};
+        Schema[] promotableOptionalSchemas = {Schema.OPTIONAL_INT8_SCHEMA, Schema.OPTIONAL_INT16_SCHEMA,
Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA,
+                                              Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA};
+
+        Object[] values = {(byte) 127, (short) 255, 32767, 327890L, 1.2F, 1.2345};
+        Map<Object, List<?>> expectedProjected = new HashMap<>();
+        expectedProjected.put(values[0], Arrays.asList((byte) 127, (short) 127, 127, 127L,
127.F, 127.));
+        expectedProjected.put(values[1], Arrays.asList((short) 255, 255, 255L, 255.F, 255.));
+        expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.));
+        expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.));
+        expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2));
+        expectedProjected.put(values[5], Arrays.asList(1.2345));
+
+        Object promoted;
+        for (int i = 0; i < promotableSchemas.length; ++i) {
+            Schema source = promotableSchemas[i];
+            List<?> expected = expectedProjected.get(values[i]);
+            for (int j = i; j < promotableSchemas.length; ++j) {
+                Schema target = promotableSchemas[j];
+                promoted = SchemaProjector.project(source, values[i], target);
+                if (target.type() == Type.FLOAT64) {
+                    assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+                } else {
+                    assertEquals(expected.get(j - i), promoted);
+                }
+            }
+            for (int j = i; j < promotableOptionalSchemas.length;  ++j) {
+                Schema target = promotableOptionalSchemas[j];
+                promoted = SchemaProjector.project(source, values[i], target);
+                if (target.type() == Type.FLOAT64) {
+                    assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+                } else {
+                    assertEquals(expected.get(j - i), promoted);
+                }
+            }
+        }
+
+        for (int i = 0; i < promotableOptionalSchemas.length; ++i) {
+            Schema source = promotableSchemas[i];
+            List<?> expected = expectedProjected.get(values[i]);
+            for (int j = i; j < promotableOptionalSchemas.length;  ++j) {
+                Schema target = promotableOptionalSchemas[j];
+                promoted = SchemaProjector.project(source, values[i], target);
+                if (target.type() == Type.FLOAT64) {
+                    assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+                } else {
+                    assertEquals(expected.get(j - i), promoted);
+                }
+            }
+        }
+
+        Schema[] nonPromotableSchemas = {Schema.BOOLEAN_SCHEMA, Schema.BYTES_SCHEMA, Schema.STRING_SCHEMA};
+        for (Schema promotableSchema: promotableSchemas) {
+            for (Schema nonPromotableSchema: nonPromotableSchemas) {
+                Object dummy = new Object();
+                try {
+                    SchemaProjector.project(promotableSchema, dummy, nonPromotableSchema);
+                    fail("Cannot promote " +  promotableSchema.type() + " to " + nonPromotableSchema.type());
+                } catch (DataException e) {
+                    // expected
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testPrimitiveOptionalProjection() throws Exception {
+        verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true,
false, true);
+        verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true,
false, false);
+
+        byte[] bytes = {(byte) 1, (byte) 2};
+        byte[] defaultBytes = {(byte) 3, (byte) 4};
+        verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes,
bytes, true);
+        verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes,
bytes, false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def",
"abc", true);
+        verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def",
"abc", false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte)
127, (byte) 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte)
127, (byte) 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short)
127, (short) 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short)
127, (short) 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789,
12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789,
12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L,
12L, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L,
12L, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F,
12.F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F,
12.F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567,
12., true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567,
12., false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short)
127, (short) 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short)
127, (short) 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789,
12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789,
12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L,
12L, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L,
12L, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12,
3.45F, 12.F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12,
3.45F, 12.F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12,
3.4567, 12., true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12,
3.4567, 12., false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12,
true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12,
false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L,
true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L,
false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F,
true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F,
false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567,
12., true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567,
12., false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L,
12L, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L,
12L, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F,
12.F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F,
12.F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567,
12., true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567,
12., false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F,
12.345F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F,
12.345F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567,
12.345, true);
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567,
12.345, false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567,
12.345, true);
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567,
12.345, false);
+    }
+
+    @Test
+    public void testStructAddField() throws Exception {
+        Schema source = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .build();
+        Struct sourceStruct = new Struct(source);
+        sourceStruct.put("field", 1);
+
+        Schema target = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", SchemaBuilder.int32().defaultValue(123).build())
+                .build();
+
+        Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+
+        assertEquals(1, (int) targetStruct.getInt32("field"));
+        assertEquals(123, (int) targetStruct.getInt32("field2"));
+
+        Schema incompatibleTargetSchema = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", Schema.INT32_SCHEMA)
+                .build();
+
+        try {
+            SchemaProjector.project(source, sourceStruct, incompatibleTargetSchema);
+            fail("Incompatible schema.");
+        } catch (DataException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testStructRemoveField() throws Exception {
+        Schema source = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", Schema.INT32_SCHEMA)
+                .build();
+        Struct sourceStruct = new Struct(source);
+        sourceStruct.put("field", 1);
+        sourceStruct.put("field2", 234);
+
+        Schema target = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .build();
+        Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+        assertEquals(1, targetStruct.get("field"));
+        try {
+            targetStruct.get("field2");
+            fail("field2 is not part of the projected struct");
+        } catch (DataException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testStructDefaultValue() throws Exception {
+        Schema source = SchemaBuilder.struct().optional()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", Schema.INT32_SCHEMA)
+                .build();
+
+        SchemaBuilder builder = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", Schema.INT32_SCHEMA);
+
+        Struct defaultStruct = new Struct(builder).put("field", 12).put("field2", 345);
+        builder.defaultValue(defaultStruct);
+        Schema target = builder.build();
+
+        Object projected = SchemaProjector.project(source, null, target);
+        assertEquals(defaultStruct, projected);
+
+        Struct sourceStruct = new Struct(source).put("field", 45).put("field2", 678);
+        Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+        assertEquals(sourceStruct.get("field"), targetStruct.get("field"));
+        assertEquals(sourceStruct.get("field2"), targetStruct.get("field2"));
+    }
+
+    @Test
+    public void testNestedSchemaProjection() throws Exception {
+        Schema sourceFlatSchema = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .build();
+        Schema targetFlatSchema = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", SchemaBuilder.int32().defaultValue(123).build())
+                .build();
+        Schema sourceNestedSchema = SchemaBuilder.struct()
+                .field("first", Schema.INT32_SCHEMA)
+                .field("second", Schema.STRING_SCHEMA)
+                .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+                .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+                .field("nested", sourceFlatSchema)
+                .build();
+        Schema targetNestedSchema = SchemaBuilder.struct()
+                .field("first", Schema.INT32_SCHEMA)
+                .field("second", Schema.STRING_SCHEMA)
+                .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+                .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+                .field("nested", targetFlatSchema)
+                .build();
+
+        Struct sourceFlatStruct = new Struct(sourceFlatSchema);
+        sourceFlatStruct.put("field", 113);
+
+        Struct sourceNestedStruct = new Struct(sourceNestedSchema);
+        sourceNestedStruct.put("first", 1);
+        sourceNestedStruct.put("second", "abc");
+        sourceNestedStruct.put("array", Arrays.asList(1, 2));
+        sourceNestedStruct.put("map", Collections.singletonMap(5, "def"));
+        sourceNestedStruct.put("nested", sourceFlatStruct);
+
+        Struct targetNestedStruct = (Struct) SchemaProjector.project(sourceNestedSchema,
sourceNestedStruct,
+                                                                     targetNestedSchema);
+        assertEquals(1, targetNestedStruct.get("first"));
+        assertEquals("abc", targetNestedStruct.get("second"));
+        assertEquals(Arrays.asList(1, 2), (List<Integer>) targetNestedStruct.get("array"));
+        assertEquals(Collections.singletonMap(5, "def"), (Map<Integer, String>) targetNestedStruct.get("map"));
+
+        Struct projectedStruct = (Struct) targetNestedStruct.get("nested");
+        assertEquals(113, projectedStruct.get("field"));
+        assertEquals(123, projectedStruct.get("field2"));
+    }
+
+    @Test
+    public void testLogicalTypeProjection() throws Exception {
+        Schema[] logicalTypeSchemas = {Decimal.schema(2), Date.SCHEMA, Time.SCHEMA, Timestamp.SCHEMA};
+        Object projected;
+
+        BigDecimal testDecimal = new BigDecimal(new BigInteger("156"), 2);
+        projected = SchemaProjector.project(Decimal.schema(2), testDecimal, Decimal.schema(2));
+        assertEquals(testDecimal, projected);
+
+        projected = SchemaProjector.project(Date.SCHEMA, 1000, Date.SCHEMA);
+        assertEquals(1000, projected);
+
+        projected = SchemaProjector.project(Time.SCHEMA, 231, Time.SCHEMA);
+        assertEquals(231, projected);
+
+        projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA);
+        assertEquals(34567L, projected);
+
+        Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build();
+        for (Schema logicalTypeSchema: logicalTypeSchemas) {
+            try {
+                SchemaProjector.project(logicalTypeSchema, null, Schema.BOOLEAN_SCHEMA);
+                fail("Cannot project logical types to non-logical types.");
+            } catch (SchemaProjectorException e) {
+                // expected
+            }
+
+            try {
+                SchemaProjector.project(logicalTypeSchema, null, namedSchema);
+                fail("Reader name is not a valid logical type name.");
+            } catch (SchemaProjectorException e) {
+                // expected
+            }
+
+            try {
+                SchemaProjector.project(Schema.BOOLEAN_SCHEMA, null, logicalTypeSchema);
+                fail("Cannot project non-logical types to logical types.");
+            } catch (SchemaProjectorException e) {
+                // expected
+            }
+        }
+    }
+
+    @Test
+    public void testArrayProjection() throws Exception {
+        Schema source = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
+
+        Object projected = SchemaProjector.project(source, Arrays.asList(1, 2, 3), source);
+        assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
+
+        Schema optionalSource = SchemaBuilder.array(Schema.INT32_SCHEMA).optional().build();
+        Schema target = SchemaBuilder.array(Schema.INT32_SCHEMA).defaultValue(Arrays.asList(1,
2, 3)).build();
+        projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), target);
+        assertEquals(Arrays.asList(4, 5), (List<Integer>) projected);
+        projected = SchemaProjector.project(optionalSource, null, target);
+        assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
+
+        Schema promotedTarget = SchemaBuilder.array(Schema.INT64_SCHEMA).defaultValue(Arrays.asList(1L,
2L, 3L)).build();
+        projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), promotedTarget);
+        List<Long> expectedProjected = Arrays.asList(4L, 5L);
+        assertEquals(expectedProjected, (List<Long>) projected);
+        projected = SchemaProjector.project(optionalSource, null, promotedTarget);
+        assertEquals(Arrays.asList(1L, 2L, 3L), (List<Long>) projected);
+
+        Schema noDefaultValueTarget = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
+        try {
+            SchemaProjector.project(optionalSource, null, noDefaultValueTarget);
+            fail("Target schema does not provide a default value.");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+
+        Schema nonPromotableTarget = SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build();
+        try {
+            SchemaProjector.project(optionalSource, null, nonPromotableTarget);
+            fail("Neither source type matches target type nor source type can be promoted
to target type");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMapProjection() throws Exception {
+        Schema source = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).optional().build();
+
+        Schema target = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).defaultValue(Collections.singletonMap(1,
2)).build();
+        Object projected = SchemaProjector.project(source, Collections.singletonMap(3, 4),
target);
+        assertEquals(Collections.singletonMap(3, 4), (Map<Integer, Integer>) projected);
+        projected = SchemaProjector.project(source, null, target);
+        assertEquals(Collections.singletonMap(1, 2), (Map<Integer, Integer>) projected);
+
+        Schema promotedTarget = SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA).defaultValue(
+                Collections.singletonMap(3L, 4.5F)).build();
+        projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), promotedTarget);
+        assertEquals(Collections.singletonMap(3L, 4.F), (Map<Long, Float>) projected);
+        projected = SchemaProjector.project(source, null, promotedTarget);
+        assertEquals(Collections.singletonMap(3L, 4.5F), (Map<Long, Float>) projected);
+
+        Schema noDefaultValueTarget = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build();
+        try {
+            SchemaProjector.project(source, null, noDefaultValueTarget);
+            fail("Reader does not provide a default value.");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+
+        Schema nonPromotableTarget = SchemaBuilder.map(Schema.BOOLEAN_SCHEMA, Schema.STRING_SCHEMA).build();
+        try {
+            SchemaProjector.project(source, null, nonPromotableTarget);
+            fail("Neither source type matches target type nor source type can be promoted
to target type");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMaybeCompatible() throws Exception {
+        Schema source = SchemaBuilder.int32().name("source").build();
+        Schema target = SchemaBuilder.int32().name("target").build();
+
+        try {
+            SchemaProjector.project(source, 12, target);
+            fail("Source name and target name mismatch.");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+
+        Schema targetWithParameters = SchemaBuilder.int32().parameters(Collections.singletonMap("key",
"value"));
+        try {
+            SchemaProjector.project(source, 34, targetWithParameters);
+            fail("Source parameters and target parameters mismatch.");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+    }
+
+    private void verifyOptionalProjection(Schema source, Type targetType, Object value, Object
defaultValue, Object expectedProjected, boolean optional) {
+        Schema target;
+        assert source.isOptional();
+        assert value != null;
+        if (optional) {
+            target = SchemaBuilder.type(targetType).optional().defaultValue(defaultValue).build();
+        } else {
+            target = SchemaBuilder.type(targetType).defaultValue(defaultValue).build();
+        }
+        Object projected = SchemaProjector.project(source, value, target);
+        if (targetType == Type.FLOAT64) {
+            assertEquals((double) expectedProjected, (double) projected, 1e-6);
+        } else {
+            assertEquals(expectedProjected, projected);
+        }
+
+        projected = SchemaProjector.project(source, null, target);
+        if (optional) {
+            assertEquals(null, projected);
+        } else {
+            assertEquals(defaultValue, projected);
+        }
+    }
+}


Mime
View raw message