kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5218; New Short serializer, deserializer, serde
Date Wed, 31 May 2017 22:11:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 f4c29613f -> 3bc173436


KAFKA-5218; New Short serializer, deserializer, serde

Author: Mario Molina <mmolimar@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>,
Michael G. Noll <michael@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3017 from mmolimar/KAFKA-5218

(cherry picked from commit dc5bf4bd453495af050ea7c0ec7a66b8d2b2e8d4)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bc17343
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bc17343
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bc17343

Branch: refs/heads/0.11.0
Commit: 3bc1734362c6e28a3f35fbe04df087e0eb3f39ed
Parents: f4c2961
Author: Mario Molina <mmolimar@gmail.com>
Authored: Wed May 31 15:09:58 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 31 15:11:37 2017 -0700

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   2 +-
 .../serialization/IntegerDeserializer.java      |   3 +-
 .../common/serialization/LongDeserializer.java  |   3 +-
 .../kafka/common/serialization/Serdes.java      |  17 ++
 .../common/serialization/ShortDeserializer.java |  47 ++++
 .../common/serialization/ShortSerializer.java   |  40 +++
 .../common/serialization/SerializationTest.java | 249 +++++--------------
 7 files changed, 176 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc17343/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a8d3033..f2fb3d9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -52,7 +52,7 @@
               files="AbstractRequest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender).java"/>
+              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc17343/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
index 29c3acc..45f8cf1 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
@@ -30,8 +30,7 @@ public class IntegerDeserializer implements Deserializer<Integer>
{
         if (data == null)
             return null;
         if (data.length != 4) {
-            throw new SerializationException("Size of data received by IntegerDeserializer
is " +
-                    "not 4");
+            throw new SerializationException("Size of data received by IntegerDeserializer
is not 4");
         }
 
         int value = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc17343/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
index e5bfe3c..a58b1d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
@@ -30,8 +30,7 @@ public class LongDeserializer implements Deserializer<Long> {
         if (data == null)
             return null;
         if (data.length != 8) {
-            throw new SerializationException("Size of data received by LongDeserializer is
" +
-                    "not 8");
+            throw new SerializationException("Size of data received by LongDeserializer is
not 8");
         }
 
         long value = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc17343/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index 0793321..4772ea5 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -70,6 +70,12 @@ public class Serdes {
         }
     }
 
+    static public final class ShortSerde extends WrapperSerde<Short> {
+        public ShortSerde() {
+            super(new ShortSerializer(), new ShortDeserializer());
+        }
+    }
+
     static public final class FloatSerde extends WrapperSerde<Float> {
         public FloatSerde() {
             super(new FloatSerializer(), new FloatDeserializer());
@@ -112,6 +118,10 @@ public class Serdes {
             return (Serde<T>) String();
         }
 
+        if (Short.class.isAssignableFrom(type)) {
+            return (Serde<T>) Short();
+        }
+
         if (Integer.class.isAssignableFrom(type)) {
             return (Serde<T>) Integer();
         }
@@ -176,6 +186,13 @@ public class Serdes {
     }
 
     /*
+     * A serde for nullable {@code Short} type.
+     */
+    static public Serde<Short> Short() {
+        return new ShortSerde();
+    }
+
+    /*
      * A serde for nullable {@code Float} type.
      */
     static public Serde<Float> Float() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc17343/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
new file mode 100644
index 0000000..45aa8ae
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.util.Map;
+
+public class ShortDeserializer implements Deserializer<Short> {
+
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    public Short deserialize(String topic, byte[] data) {
+        if (data == null)
+            return null;
+        if (data.length != 2) {
+            throw new SerializationException("Size of data received by ShortDeserializer
is not 2");
+        }
+
+        short value = 0;
+        for (byte b : data) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return value;
+    }
+
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc17343/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
new file mode 100644
index 0000000..a66aaa0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+public class ShortSerializer implements Serializer<Short> {
+
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    public byte[] serialize(String topic, Short data) {
+        if (data == null)
+            return null;
+
+        return new byte[] {
+            (byte) (data >>> 8),
+            data.byteValue()
+        };
+    }
+
+    public void close() {
+        // nothing to do
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc17343/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 12ccbe4..134882f 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -17,10 +17,11 @@
 package org.apache.kafka.common.serialization;
 
 import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Bytes;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,27 +30,50 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 public class SerializationTest {
 
     final private String topic = "testTopic";
+    final private Map<Class<Object>, List<Object>> testData = new HashMap()
{
+        {
+            put(String.class, Arrays.asList("my string"));
+            put(Short.class, Arrays.asList((short) 32767, (short) -32768));
+            put(Integer.class, Arrays.asList((int) 423412424, (int) -41243432));
+            put(Long.class, Arrays.asList(922337203685477580L, -922337203685477581L));
+            put(Float.class, Arrays.asList(5678567.12312f, -5678567.12341f));
+            put(Double.class, Arrays.asList(5678567.12312d, -5678567.12341d));
+            put(byte[].class, Arrays.asList("my string".getBytes()));
+            put(ByteBuffer.class, Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
+            put(Bytes.class, Arrays.asList(new Bytes("my string".getBytes())));
+        }
+    };
 
     private class DummyClass {
-
     }
 
     @Test
-    public void testSerdeFrom() {
-        Serde<Long> thisSerde = Serdes.serdeFrom(Long.class);
-        Serde<Long> otherSerde = Serdes.Long();
-
-        Long value = 423412424L;
+    public void allSerdesShouldRoundtripInput() {
+        for (Map.Entry<Class<Object>, List<Object>> test : testData.entrySet())
{
+            try (Serde<Object> serde = Serdes.serdeFrom(test.getKey())) {
+                for (Object value : test.getValue()) {
+                    assertEquals("Should get the original " + test.getKey().getSimpleName()
+
+                                    " after serialization and deserialization", value,
+                            serde.deserializer().deserialize(topic, serde.serializer().serialize(topic,
value)));
+                }
+            }
+        }
+    }
 
-        assertEquals("Should get the original long after serialization and deserialization",
-                value, thisSerde.deserializer().deserialize(topic, otherSerde.serializer().serialize(topic,
value)));
-        assertEquals("Should get the original long after serialization and deserialization",
-                value, otherSerde.deserializer().deserialize(topic, thisSerde.serializer().serialize(topic,
value)));
+    @Test
+    public void allSerdesShouldSupportNull() {
+        for (Class<?> cls : testData.keySet()) {
+            try (Serde<?> serde = Serdes.serdeFrom(cls)) {
+                assertThat("Should support null in " + cls.getSimpleName() + " serialization",
+                        serde.serializer().serialize(topic, null), nullValue());
+                assertThat("Should support null in " + cls.getSimpleName() + " deserialization",
+                        serde.deserializer().deserialize(topic, null), nullValue());
+            }
+        }
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -59,200 +83,65 @@ public class SerializationTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testSerdeFromNotNull() {
-        Serdes.serdeFrom(null, Serdes.Long().deserializer());
+        try (Serde<Long> serde = Serdes.Long()) {
+            Serdes.serdeFrom(null, serde.deserializer());
+        }
     }
 
     @Test
-    public void testStringSerializer() {
+    public void stringSerdeShouldSupportDifferentEncodings() {
         String str = "my string";
-
-        List<String> encodings = new ArrayList<String>();
-        encodings.add("UTF8");
-        encodings.add("UTF-16");
+        List<String> encodings = Arrays.asList("UTF8", "UTF-16");
 
         for (String encoding : encodings) {
-            Serde<String> serDeser = getStringSerde(encoding);
-            Serializer<String> serializer = serDeser.serializer();
-            Deserializer<String> deserializer = serDeser.deserializer();
-
-            assertEquals("Should get the original string after serialization and deserialization
with encoding " + encoding,
-                    str, deserializer.deserialize(topic, serializer.serialize(topic, str)));
+            try (Serde<String> serDeser = getStringSerde(encoding)) {
 
-            assertEquals("Should support null in serialization and deserialization with encoding
" + encoding,
-                    null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
+                Serializer<String> serializer = serDeser.serializer();
+                Deserializer<String> deserializer = serDeser.deserializer();
+                assertEquals("Should get the original string after serialization and deserialization
with encoding " + encoding,
+                        str, deserializer.deserialize(topic, serializer.serialize(topic,
str)));
+            }
         }
     }
 
-    @Test
-    public void testIntegerSerializer() {
-        Integer[] integers = new Integer[]{
-            423412424,
-            -41243432
-        };
-
-        Serializer<Integer> serializer = Serdes.Integer().serializer();
-        Deserializer<Integer> deserializer = Serdes.Integer().deserializer();
-
-        for (Integer integer : integers) {
-            assertEquals("Should get the original integer after serialization and deserialization",
-                    integer, deserializer.deserialize(topic, serializer.serialize(topic,
integer)));
-        }
-
-        assertEquals("Should support null in serialization and deserialization",
-                null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
-
-        serializer.close();
-        deserializer.close();
-    }
-
-    @Test
-    public void testLongSerializer() {
-        Long[] longs = new Long[]{
-            922337203685477580L,
-            -922337203685477581L
-        };
-
-        Serializer<Long> serializer = Serdes.Long().serializer();
-        Deserializer<Long> deserializer = Serdes.Long().deserializer();
-
-        for (Long value : longs) {
-            assertEquals("Should get the original long after serialization and deserialization",
-                    value, deserializer.deserialize(topic, serializer.serialize(topic, value)));
-        }
-
-        assertEquals("Should support null in serialization and deserialization",
-                null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
-
-        serializer.close();
-        deserializer.close();
-    }
-
-    @Test
-    public void shouldSerializeDeserializeFloat() {
-        final Float[] floats = new Float[]{
-            5678567.12312f,
-            -5678567.12341f
-        };
-        final Serializer<Float> serializer = Serdes.Float().serializer();
-        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
-
-        for (final Float value : floats) {
-            assertThat("Should round-trip a float",
-                value, equalTo(deserializer.deserialize(topic, serializer.serialize(topic,
value))));
-        }
-
-        serializer.close();
-        deserializer.close();
-    }
-
-    @Test
-    public void floatSerializerShouldReturnNullForNull() {
-        final Serializer<Float> serializer = Serdes.Float().serializer();
-        assertThat(serializer.serialize(topic, null), nullValue());
-        serializer.close();
-    }
-
-    @Test
-    public void floatDeserializerShouldReturnNullForNull() {
-        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
-        assertThat(deserializer.deserialize(topic, null), nullValue());
-        deserializer.close();
-    }
-
-    @Test
+    @Test(expected = SerializationException.class)
     public void floatDeserializerShouldThrowSerializationExceptionOnZeroBytes() {
-        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
-        try {
-            deserializer.deserialize(topic, new byte[0]);
-            fail("Should have thrown a SerializationException because of zero input bytes");
-        } catch (SerializationException e) {
-            // Ignore (there's no contract on the details of the exception)
+        try (Serde<Float> serde = Serdes.Float()) {
+            serde.deserializer().deserialize(topic, new byte[0]);
         }
-        deserializer.close();
     }
 
-    @Test
+    @Test(expected = SerializationException.class)
     public void floatDeserializerShouldThrowSerializationExceptionOnTooFewBytes() {
-        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
-        try {
-            deserializer.deserialize(topic, new byte[3]);
-            fail("Should have thrown a SerializationException because of too few input bytes");
-        } catch (SerializationException e) {
-            // Ignore (there's no contract on the details of the exception)
+        try (Serde<Float> serde = Serdes.Float()) {
+            serde.deserializer().deserialize(topic, new byte[3]);
         }
-        deserializer.close();
     }
 
 
-    @Test
+    @Test(expected = SerializationException.class)
     public void floatDeserializerShouldThrowSerializationExceptionOnTooManyBytes() {
-        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
-        try {
-            deserializer.deserialize(topic, new byte[5]);
-            fail("Should have thrown a SerializationException because of too many input bytes");
-        } catch (SerializationException e) {
-            // Ignore (there's no contract on the details of the exception)
+        try (Serde<Float> serde = Serdes.Float()) {
+            serde.deserializer().deserialize(topic, new byte[5]);
         }
-        deserializer.close();
     }
 
     @Test
     public void floatSerdeShouldPreserveNaNValues() {
-        final int someNaNAsIntBits = 0x7f800001;
-        final float someNaN = Float.intBitsToFloat(someNaNAsIntBits);
-        final int anotherNaNAsIntBits = 0x7f800002;
-        final float anotherNaN = Float.intBitsToFloat(anotherNaNAsIntBits);
-
-        final Serde<Float> serde = Serdes.Float();
-        // Because of NaN semantics we must assert based on the raw int bits.
-        final Float roundtrip = serde.deserializer().deserialize(topic,
-            serde.serializer().serialize(topic, someNaN));
-        assertThat(Float.floatToRawIntBits(roundtrip), equalTo(someNaNAsIntBits));
-        final Float otherRoundtrip = serde.deserializer().deserialize(topic,
-            serde.serializer().serialize(topic, anotherNaN));
-        assertThat(Float.floatToRawIntBits(otherRoundtrip), equalTo(anotherNaNAsIntBits));
-
-        serde.close();
-    }
-
-    @Test
-    public void testDoubleSerializer() {
-        Double[] doubles = new Double[]{
-            5678567.12312d,
-            -5678567.12341d
-        };
-
-        Serializer<Double> serializer = Serdes.Double().serializer();
-        Deserializer<Double> deserializer = Serdes.Double().deserializer();
-
-        for (Double value : doubles) {
-            assertEquals("Should get the original double after serialization and deserialization",
-                    value, deserializer.deserialize(topic, serializer.serialize(topic, value)));
+        int someNaNAsIntBits = 0x7f800001;
+        float someNaN = Float.intBitsToFloat(someNaNAsIntBits);
+        int anotherNaNAsIntBits = 0x7f800002;
+        float anotherNaN = Float.intBitsToFloat(anotherNaNAsIntBits);
+
+        try (Serde<Float> serde = Serdes.Float()) {
+            // Because of NaN semantics we must assert based on the raw int bits.
+            Float roundtrip = serde.deserializer().deserialize(topic,
+                    serde.serializer().serialize(topic, someNaN));
+            assertThat(Float.floatToRawIntBits(roundtrip), equalTo(someNaNAsIntBits));
+            Float otherRoundtrip = serde.deserializer().deserialize(topic,
+                    serde.serializer().serialize(topic, anotherNaN));
+            assertThat(Float.floatToRawIntBits(otherRoundtrip), equalTo(anotherNaNAsIntBits));
         }
-
-        assertEquals("Should support null in serialization and deserialization",
-                null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
-
-        serializer.close();
-        deserializer.close();
-    }
-
-    @Test
-    public void testByteBufferSerializer() {
-        ByteBuffer buf = ByteBuffer.allocate(10);
-        buf.put("my string".getBytes());
-
-        Serializer<ByteBuffer> serializer = Serdes.ByteBuffer().serializer();
-        Deserializer<ByteBuffer> deserializer = Serdes.ByteBuffer().deserializer();
-
-        assertEquals("Should get the original ByteBuffer after serialization and deserialization",
-              buf, deserializer.deserialize(topic, serializer.serialize(topic, buf)));
-
-        assertEquals("Should support null in serialization and deserialization",
-                null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
-
-        serializer.close();
-        deserializer.close();
     }
 
     private Serde<String> getStringSerde(String encoder) {


Mime
View raw message