From commits-return-10922-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Fri Dec 28 17:42:38 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 02D3218E8C for ; Fri, 28 Dec 2018 17:42:38 +0000 (UTC) Received: (qmail 34056 invoked by uid 500); 28 Dec 2018 17:42:37 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 34016 invoked by uid 500); 28 Dec 2018 17:42:37 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 34007 invoked by uid 99); 28 Dec 2018 17:42:37 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Dec 2018 17:42:37 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 19B84821AE; Fri, 28 Dec 2018 17:42:37 +0000 (UTC) Date: Fri, 28 Dec 2018 17:42:36 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.1 updated: KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value (#6027) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154601895549.18534.4686389219771553825@gitbox.apache.org> From: jgus@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.1 X-Git-Reftype: branch X-Git-Oldrev: b1ea6e07f75b9b19e99f92c47f1e4bbf16210ff5 X-Git-Newrev: 062a81beb7d5a5ea44ba82e2880848a51d0f5854 X-Git-Rev: 062a81beb7d5a5ea44ba82e2880848a51d0f5854 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 062a81b KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value (#6027) 062a81b is described below commit 062a81beb7d5a5ea44ba82e2880848a51d0f5854 Author: Renato Mefi AuthorDate: Fri Dec 28 18:39:52 2018 +0100 KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value (#6027) When using the Connect `JsonConverter`, it's impossible to produce tombstone messages, thus impacting the compaction of the topic. This patch allows the converter with and without schemas to output a NULL byte value in order to have a proper tombstone message. When it's regarding to get this data into a connect record, the approach is the same as when the payload looks like `"{ "schema": null, "payload": null }"`, this way the sink connectors can maintain their functionality and reduc [...] Reviewers: Gunnar Morling , Randall Hauch , Jason Gustafson --- .../apache/kafka/connect/json/JsonConverter.java | 31 ++++++++++++---------- .../kafka/connect/json/JsonConverterTest.java | 28 +++++++++++++++---- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index c1322b1..546fcf0 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -317,6 +317,10 @@ public class JsonConverter implements Converter, HeaderConverter { @Override public byte[] fromConnectData(String topic, Schema schema, Object value) { + if (schema == null && value == null) { + return null; + } + JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value); try { return serializer.serialize(topic, jsonValue); @@ -328,13 +332,19 @@ public class JsonConverter implements Converter, HeaderConverter { @Override public SchemaAndValue toConnectData(String topic, byte[] value) { JsonNode jsonValue; + + // This handles a tombstone message + if (value == null) { + return SchemaAndValue.NULL; + } + try { jsonValue = deserializer.deserialize(topic, value); } catch (SerializationException e) { throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e); } - if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload"))) + if (enableSchemas && (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))) throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." + " If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration."); @@ -342,23 +352,16 @@ public class JsonConverter implements Converter, HeaderConverter { // was stripped during serialization and we need to fill in an all-encompassing schema. if (!enableSchemas) { ObjectNode envelope = JsonNodeFactory.instance.objectNode(); - envelope.set("schema", null); - envelope.set("payload", jsonValue); + envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null); + envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue); jsonValue = envelope; } - return jsonToConnect(jsonValue); - } - - private SchemaAndValue jsonToConnect(JsonNode jsonValue) { - if (jsonValue == null) - return SchemaAndValue.NULL; - - if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) - throw new DataException("JSON value converted to Kafka Connect must be in envelope containing schema"); - Schema schema = asConnectSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - return new SchemaAndValue(schema, convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))); + return new SchemaAndValue( + schema, + convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) + ); } public ObjectNode asJsonSchema(Schema schema) { diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index 7686fdb..d5bb24c 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -172,10 +172,13 @@ public class JsonConverterTest { assertEquals(new SchemaAndValue(expectedSchema, expected), converted); } - @Test(expected = DataException.class) + @Test public void nullToConnect() { - // When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope - assertEquals(SchemaAndValue.NULL, converter.toConnectData(TOPIC, null)); + // When schemas are enabled, trying to decode a tombstone should be an empty envelope + // the behavior is the same as when the json is "{ "schema": null, "payload": null }" + // to keep compatibility with the record + SchemaAndValue converted = converter.toConnectData(TOPIC, null); + assertEquals(SchemaAndValue.NULL, converted); } @Test @@ -696,6 +699,23 @@ public class JsonConverterTest { ); } + @Test + public void nullSchemaAndNullValueToJson() { + // This characterizes the production of tombstone messages when Json schemas is enabled + Map props = Collections.singletonMap("schemas.enable", true); + converter.configure(props, true); + byte[] converted = converter.fromConnectData(TOPIC, null, null); + assertNull(converted); + } + + @Test + public void nullValueToJson() { + // This characterizes the production of tombstone messages when Json schemas is not enabled + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + byte[] converted = converter.fromConnectData(TOPIC, null, null); + assertNull(converted); + } @Test(expected = DataException.class) public void mismatchSchemaJson() { @@ -703,8 +723,6 @@ public class JsonConverterTest { converter.fromConnectData(TOPIC, Schema.FLOAT64_SCHEMA, true); } - - @Test public void noSchemaToConnect() { Map props = Collections.singletonMap("schemas.enable", false);