kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4183; Corrected Kafka Connect's JSON Converter to properly convert from null to logical values
Date Fri, 16 Sep 2016 21:58:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 86aa0eb0f -> 567cc3d78


KAFKA-4183; Corrected Kafka Connect's JSON Converter to properly convert from null to logical
values

The `JsonConverter` class has `LogicalTypeConverter` implementations for Date, Time, Timestamp,
and Decimal, but these implementations fail when the input literal value (deserialized from
the message) is null.

Test cases were added to check for these cases, and these failed before the `LogicalTypeConverter`
implementations were fixed to consider whether the schema has a default value or is optional,
similarly to how the `JsonToConnectTypeConverter` implementations do this. Once the fixes
were made, the new tests pass.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Shikhar Bhushan <shikhar@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes #1867 from rhauch/kafka-4183


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/567cc3d7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/567cc3d7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/567cc3d7

Branch: refs/heads/trunk
Commit: 567cc3d78746391a3b050d4eea76571e4bf20e89
Parents: 86aa0eb
Author: Randall Hauch <rhauch@gmail.com>
Authored: Fri Sep 16 14:55:46 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Sep 16 14:55:46 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/json/JsonConverter.java       |  4 +++
 .../kafka/connect/json/JsonConverterTest.java   | 38 ++++++++++++++++++++
 2 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/567cc3d7/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index f6c778b..a8df107 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -211,6 +211,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter()
{
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof byte[]))
                     throw new DataException("Invalid type for Decimal, underlying representation
should be bytes but was " + value.getClass());
                 return Decimal.toLogical(schema, (byte[]) value);
@@ -220,6 +221,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Date, underlying representation
should be int32 but was " + value.getClass());
                 return Date.toLogical(schema, (int) value);
@@ -229,6 +231,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Time, underlying representation
should be int32 but was " + value.getClass());
                 return Time.toLogical(schema, (int) value);
@@ -238,6 +241,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter()
{
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Long))
                     throw new DataException("Invalid type for Timestamp, underlying representation
should be int64 but was " + value.getClass());
                 return Timestamp.toLogical(schema, (long) value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/567cc3d7/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 0e7c153..d7c1ceb 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -58,6 +58,7 @@ import java.util.TimeZone;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -221,6 +222,16 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void decimalToConnectOptional() {
+        Schema schema = Decimal.builder(2).optional().schema();
+        // Payload is base64 encoded byte[]{0, -100}, which is the two's complement encoding
of 156.
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\",
\"version\": 1, \"optional\": true, \"parameters\": { \"scale\": \"2\" } }, \"payload\": null
}";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void dateToConnect() {
         Schema schema = Date.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
@@ -235,6 +246,15 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void dateToConnectOptional() {
+        Schema schema = Date.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Date\",
\"version\": 1, \"optional\": true }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void timeToConnect() {
         Schema schema = Time.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
@@ -249,6 +269,15 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void timeToConnectOptional() {
+        Schema schema = Time.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Time\",
\"version\": 1, \"optional\": true }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void timestampToConnect() {
         Schema schema = Timestamp.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
@@ -263,6 +292,15 @@ public class JsonConverterTest {
         assertEquals(reference, converted);
     }
 
+    @Test
+    public void timestampToConnectOptional() {
+        Schema schema = Timestamp.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": \"org.apache.kafka.connect.data.Timestamp\",
\"version\": 1, \"optional\": true }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
     // Schema metadata
 
     @Test


Mime
View raw message