kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4183; Centralize checking for optional and default values in JsonConverter
Date Mon, 19 Sep 2016 19:49:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 896ad63f1 -> 431c3b093


KAFKA-4183; Centralize checking for optional and default values in JsonConverter

Cleaner to just check once for optional & default value from the `convertToConnect()`
function.

It also helps address an issue with conversions for logical type schemas that have default
values and null as the included value. That test case is _probably_ not an issue in practice,
since when using the `JsonConverter` to serialize a missing field with a default value, it
will serialize the default value for the field. But in the face of JSON data streaming in
from a topic being [generous on input, strict on output](http://tedwise.com/2009/05/27/generous-on-input-strict-on-output)
seems best.

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #1872 from shikhar/kafka-4183


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/431c3b09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/431c3b09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/431c3b09

Branch: refs/heads/trunk
Commit: 431c3b0937da103b6e79e9f970c0862898205909
Parents: 896ad63
Author: Shikhar Bhushan <shikhar@confluent.io>
Authored: Mon Sep 19 12:49:38 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Sep 19 12:49:38 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/json/JsonConverter.java       | 41 +++-------
 .../kafka/connect/json/JsonConverterTest.java   | 79 +++++++++++++++++++-
 2 files changed, 88 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/431c3b09/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index a8df107..a4ce32a 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -58,61 +58,46 @@ public class JsonConverter implements Converter {
 
     private static final HashMap<Schema.Type, JsonToConnectTypeConverter> TO_CONNECT_CONVERTERS
= new HashMap<>();
 
-    private static Object checkOptionalAndDefault(Schema schema) {
-        if (schema.defaultValue() != null)
-            return schema.defaultValue();
-        if (schema.isOptional())
-            return null;
-        throw new DataException("Invalid null value for required field");
-    }
-
     static {
         TO_CONNECT_CONVERTERS.put(Schema.Type.BOOLEAN, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.booleanValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.INT8, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return (byte) value.intValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.INT16, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return (short) value.intValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.INT32, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.intValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.INT64, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.longValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.FLOAT32, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.floatValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.FLOAT64, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.doubleValue();
             }
         });
@@ -120,7 +105,6 @@ public class JsonConverter implements Converter {
             @Override
             public Object convert(Schema schema, JsonNode value) {
                 try {
-                    if (value.isNull()) return checkOptionalAndDefault(schema);
                     return value.binaryValue();
                 } catch (IOException e) {
                     throw new DataException("Invalid bytes field", e);
@@ -130,15 +114,12 @@ public class JsonConverter implements Converter {
         TO_CONNECT_CONVERTERS.put(Schema.Type.STRING, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.textValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.ARRAY, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
                 Schema elemSchema = schema == null ? null : schema.valueSchema();
                 ArrayList<Object> result = new ArrayList<>();
                 for (JsonNode elem : value) {
@@ -150,8 +131,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_CONVERTERS.put(Schema.Type.MAP, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
                 Schema keySchema = schema == null ? null : schema.keySchema();
                 Schema valueSchema = schema == null ? null : schema.valueSchema();
 
@@ -185,8 +164,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_CONVERTERS.put(Schema.Type.STRUCT, new JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
                 if (!value.isObject())
                     throw new DataException("Structs should be encoded as JSON objects, but
found " + value.getNodeType());
 
@@ -211,7 +188,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter()
{
             @Override
             public Object convert(Schema schema, Object value) {
-                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof byte[]))
                     throw new DataException("Invalid type for Decimal, underlying representation
should be bytes but was " + value.getClass());
                 return Decimal.toLogical(schema, (byte[]) value);
@@ -221,7 +197,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
-                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Date, underlying representation
should be int32 but was " + value.getClass());
                 return Date.toLogical(schema, (int) value);
@@ -231,7 +206,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
-                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Time, underlying representation
should be int32 but was " + value.getClass());
                 return Time.toLogical(schema, (int) value);
@@ -241,7 +215,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter()
{
             @Override
             public Object convert(Schema schema, Object value) {
-                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Long))
                     throw new DataException("Invalid type for Timestamp, underlying representation
should be int64 but was " + value.getClass());
                 return Timestamp.toLogical(schema, (long) value);
@@ -688,10 +661,16 @@ public class JsonConverter implements Converter {
 
 
     private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
-        JsonToConnectTypeConverter typeConverter;
         final Schema.Type schemaType;
         if (schema != null) {
             schemaType = schema.type();
+            if (jsonValue.isNull()) {
+                if (schema.defaultValue() != null)
+                    return schema.defaultValue(); // any logical type conversions should
already have been applied
+                if (schema.isOptional())
+                    return null;
+                throw new DataException("Invalid null value for required " + schemaType +
 " field");
+            }
         } else {
             switch (jsonValue.getNodeType()) {
                 case NULL:
@@ -724,9 +703,10 @@ public class JsonConverter implements Converter {
                     break;
             }
         }
-        typeConverter = TO_CONNECT_CONVERTERS.get(schemaType);
+
+        final JsonToConnectTypeConverter typeConverter = TO_CONNECT_CONVERTERS.get(schemaType);
         if (typeConverter == null)
-            throw new DataException("Unknown schema type: " + schema.type());
+            throw new DataException("Unknown schema type: " + String.valueOf(schemaType));
 
         Object converted = typeConverter.convert(schema, jsonValue);
         if (schema != null && schema.name() != null) {
@@ -737,7 +717,6 @@ public class JsonConverter implements Converter {
         return converted;
     }
 
-
     private interface JsonToConnectTypeConverter {
         Object convert(Schema schema, JsonNode value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/431c3b09/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index d7c1ceb..74aad33 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -224,7 +224,6 @@ public class JsonConverterTest {
     @Test
     public void decimalToConnectOptional() {
         Schema schema = Decimal.builder(2).optional().schema();
-        // Payload is base64 encoded byte[]{0, -100}, which is the two's complement encoding
of 156.
         String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\",
\"version\": 1, \"optional\": true, \"parameters\": { \"scale\": \"2\" } }, \"payload\": null
}";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
         assertEquals(schema, schemaAndValue.schema());
@@ -232,6 +231,26 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void decimalToConnectWithDefaultValue() {
+        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        Schema schema = Decimal.builder(2).defaultValue(reference).build();
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\",
\"version\": 1, \"default\": \"AJw=\", \"parameters\": { \"scale\": \"2\" } }, \"payload\":
null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
+    public void decimalToConnectOptionalWithDefaultValue() {
+        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        Schema schema = Decimal.builder(2).optional().defaultValue(reference).build();
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\",
\"version\": 1, \"optional\": true, \"default\": \"AJw=\", \"parameters\": { \"scale\": \"2\"
} }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
     public void dateToConnect() {
         Schema schema = Date.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
@@ -255,6 +274,26 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void dateToConnectWithDefaultValue() {
+        java.util.Date reference = new java.util.Date(0);
+        Schema schema = Date.builder().defaultValue(reference).schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Date\",
\"version\": 1, \"default\": 0 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
+    public void dateToConnectOptionalWithDefaultValue() {
+        java.util.Date reference = new java.util.Date(0);
+        Schema schema = Date.builder().optional().defaultValue(reference).schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Date\",
\"version\": 1, \"optional\": true, \"default\": 0 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
     public void timeToConnect() {
         Schema schema = Time.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
@@ -278,6 +317,26 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void timeToConnectWithDefaultValue() {
+        java.util.Date reference = new java.util.Date(0);
+        Schema schema = Time.builder().defaultValue(reference).schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Time\",
\"version\": 1, \"default\": 0 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
+    public void timeToConnectOptionalWithDefaultValue() {
+        java.util.Date reference = new java.util.Date(0);
+        Schema schema = Time.builder().optional().defaultValue(reference).schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Time\",
\"version\": 1, \"optional\": true, \"default\": 0 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
     public void timestampToConnect() {
         Schema schema = Timestamp.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
@@ -301,6 +360,24 @@ public class JsonConverterTest {
         assertNull(schemaAndValue.value());
     }
 
+    @Test
+    public void timestampToConnectWithDefaultValue() {
+        Schema schema = Timestamp.builder().defaultValue(new java.util.Date(42)).schema();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": \"org.apache.kafka.connect.data.Timestamp\",
\"version\": 1, \"default\": 42 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(new java.util.Date(42), schemaAndValue.value());
+    }
+
+    @Test
+    public void timestampToConnectOptionalWithDefaultValue() {
+        Schema schema = Timestamp.builder().optional().defaultValue(new java.util.Date(42)).schema();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": \"org.apache.kafka.connect.data.Timestamp\",
\"version\": 1,  \"optional\": true, \"default\": 42 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(new java.util.Date(42), schemaAndValue.value());
+    }
+
     // Schema metadata
 
     @Test


Mime
View raw message