kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8013; Avoid underflow when reading a Struct from a partially correct buffer (#6340)
Date Mon, 08 Apr 2019 21:24:24 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1bf1e46  KAFKA-8013; Avoid underflow when reading a Struct from a partially correct
buffer (#6340)
1bf1e46 is described below

commit 1bf1e46e7e7ea192019576a5ee613bcedd843e96
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Mon Apr 8 14:24:00 2019 -0700

    KAFKA-8013; Avoid underflow when reading a Struct from a partially correct buffer (#6340)
    
    Protocol compatibility can be facilitated if a Struct, that has been defined as an extension
of a previous Struct by adding fields at the end of the older version, can read a message
of an older version by ignoring the absence of the missing new fields. Reading the missing
fields should be allowed by the definition of these fields (they have to be nullable) when
supported by the schema.
    
    Reviewers: David Arthur <mumrah@gmail.com>, Randall Hauch <rhauch@gmail.com>,
Jason Gustafson <jason@confluent.io>
---
 .../apache/kafka/common/protocol/types/Schema.java | 37 +++++++++++-
 .../protocol/types/ProtocolSerializationTest.java  | 68 ++++++++++++++++++++++
 2 files changed, 103 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index cbcd449..5b12eee 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -28,15 +28,33 @@ public class Schema extends Type {
 
     private final BoundField[] fields;
     private final Map<String, BoundField> fieldsByName;
+    private final boolean tolerateMissingFieldsWithDefaults;
 
     /**
      * Construct the schema with a given list of its field values
      *
+     * @param fs the fields of this schema
+     *
      * @throws SchemaException If the given list have duplicate fields
      */
     public Schema(Field... fs) {
+        this(false, fs);
+    }
+
+    /**
+     * Construct the schema with a given list of its field values and the ability to tolerate
+     * missing optional fields with defaults at the end of the schema definition.
+     *
+     * @param tolerateMissingFieldsWithDefaults whether to accept records with missing optional
+     * fields the end of the schema
+     * @param fs the fields of this schema
+     *
+     * @throws SchemaException If the given list have duplicate fields
+     */
+    public Schema(boolean tolerateMissingFieldsWithDefaults, Field... fs) {
         this.fields = new BoundField[fs.length];
         this.fieldsByName = new HashMap<>();
+        this.tolerateMissingFieldsWithDefaults = tolerateMissingFieldsWithDefaults;
         for (int i = 0; i < this.fields.length; i++) {
             Field def = fs[i];
             if (fieldsByName.containsKey(def.name))
@@ -64,14 +82,29 @@ public class Schema extends Type {
     }
 
     /**
-     * Read a struct from the buffer
+     * Read a struct from the buffer. If this schema is configured to tolerate missing
+     * optional fields at the end of the buffer, these fields are replaced with their default
+     * values; otherwise, if the schema does not tolerate missing fields, or if missing fields
+     * don't have a default value, a {@code SchemaException} is thrown to signify that mandatory
+     * fields are missing.
      */
     @Override
     public Struct read(ByteBuffer buffer) {
         Object[] objects = new Object[fields.length];
         for (int i = 0; i < fields.length; i++) {
             try {
-                objects[i] = fields[i].def.type.read(buffer);
+                if (tolerateMissingFieldsWithDefaults) {
+                    if (buffer.hasRemaining()) {
+                        objects[i] = fields[i].def.type.read(buffer);
+                    } else if (fields[i].def.hasDefaultValue) {
+                        objects[i] = fields[i].def.defaultValue;
+                    } else {
+                        throw new SchemaException("Missing value for field '" + fields[i].def.name
+
+                                "' which has no default value.");
+                    }
+                } else {
+                    objects[i] = fields[i].def.type.read(buffer);
+                }
             } catch (Exception e) {
                 throw new SchemaException("Error reading field '" + fields[i].def.name +
"': " +
                                           (e.getMessage() == null ? e.getClass().getName()
: e.getMessage()));
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 6e9341a..8a3b8ce 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 public class ProtocolSerializationTest {
@@ -278,4 +279,71 @@ public class ProtocolSerializationTest {
         assertNotEquals(emptyStruct1, mostlyEmptyStruct);
         assertNotEquals(mostlyEmptyStruct, emptyStruct1);
     }
+
+    @Test
+    public void testReadIgnoringExtraDataAtTheEnd() {
+        Schema oldSchema = new Schema(new Field("field1", Type.NULLABLE_STRING), new Field("field2",
Type.NULLABLE_STRING));
+        Schema newSchema = new Schema(new Field("field1", Type.NULLABLE_STRING));
+        String value = "foo bar baz";
+        Struct oldFormat = new Struct(oldSchema).set("field1", value).set("field2", "fine
to ignore");
+        ByteBuffer buffer = ByteBuffer.allocate(oldSchema.sizeOf(oldFormat));
+        oldFormat.writeTo(buffer);
+        buffer.flip();
+        Struct newFormat = newSchema.read(buffer);
+        assertEquals(value, newFormat.get("field1"));
+    }
+
+    @Test
+    public void testReadWhenOptionalDataMissingAtTheEndIsTolerated() {
+        Schema oldSchema = new Schema(new Field("field1", Type.NULLABLE_STRING));
+        Schema newSchema = new Schema(
+                true,
+                new Field("field1", Type.NULLABLE_STRING),
+                new Field("field2", Type.NULLABLE_STRING, "", true, "default"),
+                new Field("field3", Type.NULLABLE_STRING, "", true, null),
+                new Field("field4", Type.NULLABLE_BYTES, "", true, ByteBuffer.allocate(0)),
+                new Field("field5", Type.INT64, "doc", true, Long.MAX_VALUE));
+        String value = "foo bar baz";
+        Struct oldFormat = new Struct(oldSchema).set("field1", value);
+        ByteBuffer buffer = ByteBuffer.allocate(oldSchema.sizeOf(oldFormat));
+        oldFormat.writeTo(buffer);
+        buffer.flip();
+        Struct newFormat = newSchema.read(buffer);
+        assertEquals(value, newFormat.get("field1"));
+        assertEquals("default", newFormat.get("field2"));
+        assertEquals(null, newFormat.get("field3"));
+        assertEquals(ByteBuffer.allocate(0), newFormat.get("field4"));
+        assertEquals(Long.MAX_VALUE, newFormat.get("field5"));
+    }
+
+    @Test
+    public void testReadWhenOptionalDataMissingAtTheEndIsNotTolerated() {
+        Schema oldSchema = new Schema(new Field("field1", Type.NULLABLE_STRING));
+        Schema newSchema = new Schema(
+                new Field("field1", Type.NULLABLE_STRING),
+                new Field("field2", Type.NULLABLE_STRING, "", true, "default"));
+        String value = "foo bar baz";
+        Struct oldFormat = new Struct(oldSchema).set("field1", value);
+        ByteBuffer buffer = ByteBuffer.allocate(oldSchema.sizeOf(oldFormat));
+        oldFormat.writeTo(buffer);
+        buffer.flip();
+        SchemaException e = assertThrows(SchemaException.class, () -> newSchema.read(buffer));
+        e.getMessage().contains("Error reading field 'field2': java.nio.BufferUnderflowException");
+    }
+
+    @Test
+    public void testReadWithMissingNonOptionalExtraDataAtTheEnd() {
+        Schema oldSchema = new Schema(new Field("field1", Type.NULLABLE_STRING));
+        Schema newSchema = new Schema(
+                true,
+                new Field("field1", Type.NULLABLE_STRING),
+                new Field("field2", Type.NULLABLE_STRING));
+        String value = "foo bar baz";
+        Struct oldFormat = new Struct(oldSchema).set("field1", value);
+        ByteBuffer buffer = ByteBuffer.allocate(oldSchema.sizeOf(oldFormat));
+        oldFormat.writeTo(buffer);
+        buffer.flip();
+        SchemaException e = assertThrows(SchemaException.class, () -> newSchema.read(buffer));
+        e.getMessage().contains("Missing value for field 'field2' which has no default value");
+    }
 }


Mime
View raw message