kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2695: limited support for nullable byte arrays
Date Mon, 18 Jan 2016 17:54:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0c32bc992 -> 747dc930f


KAFKA-2695: limited support for nullable byte arrays

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #780 from hachikuji/KAFKA-2695


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/747dc930
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/747dc930
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/747dc930

Branch: refs/heads/trunk
Commit: 747dc930ff710ec890b5f054b4ec4bf7bbb7c960
Parents: 0c32bc9
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Jan 18 09:54:02 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jan 18 09:54:02 2016 -0800

----------------------------------------------------------------------
 .../consumer/internals/ConsumerProtocol.java    |  4 +-
 .../kafka/common/protocol/types/Struct.java     |  2 +
 .../kafka/common/protocol/types/Type.java       | 67 ++++++++++++++++++++
 .../internals/ConsumerProtocolTest.java         | 18 ++++++
 .../types/ProtocolSerializationTest.java        | 25 ++++++--
 5 files changed, 110 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/747dc930/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 3f87995..361865d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -71,13 +71,13 @@ public class ConsumerProtocol {
 
     public static final Schema SUBSCRIPTION_V0 = new Schema(
             new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
-            new Field(USER_DATA_KEY_NAME, Type.BYTES));
+            new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
     public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema(
             new Field(TOPIC_KEY_NAME, Type.STRING),
             new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));
     public static final Schema ASSIGNMENT_V0 = new Schema(
             new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
-            new Field(USER_DATA_KEY_NAME, Type.BYTES));
+            new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
 
     public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription)
{
         Struct struct = new Struct(SUBSCRIPTION_V0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/747dc930/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 54c3deb..4902f25 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -51,6 +51,8 @@ public class Struct {
             return value;
         else if (field.defaultValue != Field.NO_DEFAULT)
             return field.defaultValue;
+        else if (field.type.isNullable())
+            return null;
         else
             throw new SchemaException("Missing value for field '" + field.name + "' which
has no default value.");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/747dc930/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 9ea28b2..0483387 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -51,6 +51,14 @@ public abstract class Type {
      */
     public abstract int sizeOf(Object o);
 
+    /**
+     * Check if the type supports null values
+     * @return whether or not null is a valid value for the type implementation
+     */
+    public boolean isNullable() {
+        return false;
+    }
+
     public static final Type INT8 = new Type() {
         @Override
         public void write(ByteBuffer buffer, Object o) {
@@ -247,4 +255,63 @@ public abstract class Type {
         }
     };
 
+    public static final Type NULLABLE_BYTES = new Type() {
+        @Override
+        public boolean isNullable() {
+            return true;
+        }
+
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            if (o == null) {
+                buffer.putInt(-1);
+                return;
+            }
+
+            ByteBuffer arg = (ByteBuffer) o;
+            int pos = arg.position();
+            buffer.putInt(arg.remaining());
+            buffer.put(arg);
+            arg.position(pos);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            int size = buffer.getInt();
+            if (size < 0)
+                return null;
+
+            ByteBuffer val = buffer.slice();
+            val.limit(size);
+            buffer.position(buffer.position() + size);
+            return val;
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            if (o == null)
+                return 4;
+
+            ByteBuffer buffer = (ByteBuffer) o;
+            return 4 + buffer.remaining();
+        }
+
+        @Override
+        public String toString() {
+            return "NULLABLE_BYTES";
+        }
+
+        @Override
+        public ByteBuffer validate(Object item) {
+            if (item == null)
+                return null;
+
+            if (item instanceof ByteBuffer)
+                return (ByteBuffer) item;
+
+            throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
+        }
+    };
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/747dc930/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 8113770..be98ce7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -33,16 +33,25 @@ import java.util.List;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class ConsumerProtocolTest {
 
     @Test
     public void serializeDeserializeMetadata() {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
+        ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+        assertEquals(subscription.topics(), parsedSubscription.topics());
+    }
 
+    @Test
+    public void serializeDeserializeNullSubscriptionUserData() {
+        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertNull(subscription.userData());
     }
 
     @Test
@@ -82,6 +91,15 @@ public class ConsumerProtocolTest {
     }
 
     @Test
+    public void deserializeNullAssignmentUserData() {
+        List<TopicPartition> partitions = Arrays.asList(new TopicPartition("foo", 0),
new TopicPartition("bar", 2));
+        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions,
null));
+        PartitionAssignor.Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
+        assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
+        assertNull(parsedAssignment.userData());
+    }
+
+    @Test
     public void deserializeNewAssignmentVersion() {
         // verify that a new version which adds a field is still parseable
         short version = 100;

http://git-wip-us.apache.org/repos/asf/kafka/blob/747dc930/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index d2e2782..9fe20c1 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -39,14 +39,16 @@ public class ProtocolSerializationTest {
                                  new Field("int64", Type.INT64),
                                  new Field("string", Type.STRING),
                                  new Field("bytes", Type.BYTES),
+                                 new Field("nullable_bytes", Type.NULLABLE_BYTES),
                                  new Field("array", new ArrayOf(Type.INT32)),
-                                 new Field("struct", new Schema(new Field("field", Type.INT32))));
+                                 new Field("struct", new Schema(new Field("field", new ArrayOf(Type.INT32)))));
         this.struct = new Struct(this.schema).set("int8", (byte) 1)
                                              .set("int16", (short) 1)
                                              .set("int32", 1)
                                              .set("int64", 1L)
                                              .set("string", "1")
-                                             .set("bytes", "1".getBytes())
+                                             .set("bytes", ByteBuffer.wrap("1".getBytes()))
+                                             .set("nullable_bytes", null)
                                              .set("array", new Object[] {1});
         this.struct.set("struct", this.struct.instance("struct").set("field", new Object[]
{1, 2, 3}));
     }
@@ -62,6 +64,9 @@ public class ProtocolSerializationTest {
         check(Type.STRING, "A\u00ea\u00f1\u00fcC");
         check(Type.BYTES, ByteBuffer.allocate(0));
         check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes()));
+        check(Type.NULLABLE_BYTES, null);
+        check(Type.NULLABLE_BYTES, ByteBuffer.allocate(0));
+        check(Type.NULLABLE_BYTES, ByteBuffer.wrap("abcd".getBytes()));
         check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4});
         check(new ArrayOf(Type.STRING), new Object[] {});
         check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"});
@@ -74,9 +79,11 @@ public class ProtocolSerializationTest {
             try {
                 this.struct.set(f, null);
                 this.struct.validate();
-                fail("Should not allow serialization of null value.");
+                if (!f.type.isNullable())
+                    fail("Should not allow serialization of null value.");
             } catch (SchemaException e) {
-                // this is good
+                assertFalse(f.type.isNullable());
+            } finally {
                 this.struct.set(f, o);
             }
         }
@@ -91,6 +98,16 @@ public class ProtocolSerializationTest {
     }
 
     @Test
+    public void testNullableDefault() {
+        // Should use default even if the field allows null values
+        ByteBuffer empty = ByteBuffer.allocate(0);
+        Schema schema = new Schema(new Field("field", Type.NULLABLE_BYTES, "doc", empty));
+        Struct struct = new Struct(schema);
+        assertEquals("Should get the default value", empty, struct.get("field"));
+        struct.validate(); // should be valid even with missing value
+    }
+
+    @Test
     public void testArray() {
         Type type = new ArrayOf(Type.INT8);
         int size = 10;


Mime
View raw message