kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3029: Mark TopicPartition and OffsetAndMetadata as Serializable
Date Fri, 29 Jan 2016 18:04:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 56c7842cb -> 111927c39


KAFKA-3029: Mark TopicPartition and OffsetAndMetadata as Serializable

Patch for issue KAFKA-3029

Given that the fix is trivial no new test case is needed. I have run the test suite using
gradle (as mentioned  https://github.com/apache/kafka/blob/trunk/README.md) and suite runs
clean.

Author: Praveen Devarao <praveendrl@in.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Gwen Shapira <cshapi@gmail.com>, Ewen
Cheslack-Postava <ewen@confluent.io>

Closes #711 from praveend/tp_serializable_branch


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/111927c3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/111927c3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/111927c3

Branch: refs/heads/0.9.0
Commit: 111927c398a36eb11d253178e766e76d897f529f
Parents: 56c7842
Author: Praveen Devarao <praveendrl@in.ibm.com>
Authored: Fri Jan 29 09:23:55 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Jan 29 09:51:21 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/OffsetAndMetadata.java     |   4 +-
 .../org/apache/kafka/common/TopicPartition.java |   4 +-
 ...alizeCompatibilityOffsetAndMetadataTest.java |  61 +++++++++++++++++++
 ...erializeCompatibilityTopicPartitionTest.java |  61 +++++++++++++++++++
 .../apache/kafka/common/utils/Serializer.java   |  49 +++++++++++++++
 .../offsetAndMetadataSerializedfile             | Bin 0 -> 144 bytes
 .../serializedData/topicPartitionSerializedfile | Bin 0 -> 125 bytes
 7 files changed, 177 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/111927c3/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index 1a93047..66b257d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -12,12 +12,14 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.io.Serializable;
+
 /**
  * The Kafka offset commit API allows users to provide additional metadata (in the form of
a string)
  * when an offset is committed. This can be useful (for example) to store information about
which
  * node made the commit, what time the commit was made, etc.
  */
-public class OffsetAndMetadata {
+public class OffsetAndMetadata implements Serializable {
     private final long offset;
     private final String metadata;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/111927c3/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
index 3348684..383c00d 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.common;
 
+import java.io.Serializable;
+
 /**
  * A topic name and partition number
  */
-public final class TopicPartition {
+public final class TopicPartition implements Serializable {
 
     private int hash = 0;
     private final int partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/111927c3/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
new file mode 100644
index 0000000..ce1d4cd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.utils.Serializer;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test case ensures OffsetAndMetadata class is serializable and is serialization compatible.
+ * Note: this ensures that the current code can deserialize data serialized with older versions
of the code, but not the reverse.
+ * That is, older code won't necessarily be able to deserialize data serialized with newer
code.
+ */
+public class SerializeCompatibilityOffsetAndMetadataTest {
+    private String metadata = "test commit metadata";
+    private String fileName = "serializedData/offsetAndMetadataSerializedfile";
+    private long offset = 10;
+
+    private void checkValues(OffsetAndMetadata deSerOAM) {
+        //assert deserialized values are same as original
+        assertEquals("Offset should be " + offset + " but got " + deSerOAM.offset(), offset,
deSerOAM.offset());
+        assertEquals("metadata should be " + metadata + " but got " + deSerOAM.metadata(),
metadata, deSerOAM.metadata());
+    }
+
+    @Test
+    public void testSerializationRoundtrip() throws IOException, ClassNotFoundException {
+        //assert OffsetAndMetadata is serializable
+        OffsetAndMetadata origOAM = new OffsetAndMetadata(offset, metadata);
+        byte[] byteArray =  Serializer.serialize(origOAM);
+
+        //deserialize the byteArray and check if the values are same as original
+        Object deserializedObject = Serializer.deserialize(byteArray);
+        assertTrue(deserializedObject instanceof OffsetAndMetadata);
+        checkValues((OffsetAndMetadata) deserializedObject);
+    }
+
+    @Test
+    public void testOffsetMetadataSerializationCompatibility() throws IOException, ClassNotFoundException
{
+        // assert serialized OffsetAndMetadata object in file (oamserializedfile under resources
folder) is
+        // deserializable into OffsetAndMetadata and is compatible
+        Object deserializedObject = Serializer.deserialize(fileName);
+        assertTrue(deserializedObject instanceof OffsetAndMetadata);
+        checkValues((OffsetAndMetadata) deserializedObject);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/111927c3/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
new file mode 100644
index 0000000..7786a73
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.utils.Serializer;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test ensures TopicPartition class is serializable and is serialization compatible.
+ * Note: this ensures that the current code can deserialize data serialized with older versions
of the code, but not the reverse.
+ * That is, older code won't necessarily be able to deserialize data serialized with newer
code.
+ */
+public class SerializeCompatibilityTopicPartitionTest {
+
+    private String topicName = "mytopic";
+    private String fileName = "serializedData/topicPartitionSerializedfile";
+    private int partNum = 5;
+
+    private void checkValues(TopicPartition deSerTP) {
+        //assert deserialized values are same as original
+        assertEquals("partition number should be " + partNum + " but got " + deSerTP.partition(),
partNum, deSerTP.partition());
+        assertEquals("topic should be " + topicName + " but got " + deSerTP.topic(), topicName,
deSerTP.topic());
+    }
+
+    @Test
+    public void testSerializationRoundtrip() throws IOException, ClassNotFoundException {
+        //assert TopicPartition is serializable and deserialization renders the clone of
original properly
+        TopicPartition origTp = new TopicPartition(topicName, partNum);
+        byte[] byteArray = Serializer.serialize(origTp);
+
+        //deserialize the byteArray and check if the values are same as original
+        Object deserializedObject = Serializer.deserialize(byteArray);
+        assertTrue(deserializedObject instanceof TopicPartition);
+        checkValues((TopicPartition) deserializedObject);
+    }
+
+    @Test
+    public void testTopiPartitionSerializationCompatibility() throws IOException, ClassNotFoundException
{
+        // assert serialized TopicPartition object in file (serializedData/topicPartitionSerializedfile)
is
+        // deserializable into TopicPartition and is compatible
+        Object deserializedObject = Serializer.deserialize(fileName);
+        assertTrue(deserializedObject instanceof TopicPartition);
+        checkValues((TopicPartition) deserializedObject);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/111927c3/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
new file mode 100644
index 0000000..f30c0e1
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+
+public class Serializer {
+
+    public static byte[] serialize(Object toSerialize) throws IOException {
+        ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+        try (ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream)) {
+            ooStream.writeObject(toSerialize);
+            return arrayOutputStream.toByteArray();
+        }
+    }
+
+    public static Object deserialize(InputStream inputStream) throws IOException, ClassNotFoundException
{
+        try (ObjectInputStream objectInputStream = new ObjectInputStream(inputStream)) {
+            return objectInputStream.readObject();
+        }
+    }
+
+    public static Object deserialize(byte[] byteArray) throws IOException, ClassNotFoundException
{
+        ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray);
+        return deserialize(arrayInputStream);
+    }
+
+    public static Object deserialize(String fileName) throws IOException, ClassNotFoundException
{
+        ClassLoader classLoader = Serializer.class.getClassLoader();
+        InputStream fileStream = classLoader.getResourceAsStream(fileName);
+        return deserialize(fileStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/111927c3/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile
----------------------------------------------------------------------
diff --git a/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile b/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile
new file mode 100644
index 0000000..95319cb
Binary files /dev/null and b/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile
differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/111927c3/clients/src/test/resources/serializedData/topicPartitionSerializedfile
----------------------------------------------------------------------
diff --git a/clients/src/test/resources/serializedData/topicPartitionSerializedfile b/clients/src/test/resources/serializedData/topicPartitionSerializedfile
new file mode 100644
index 0000000..2c1c501
Binary files /dev/null and b/clients/src/test/resources/serializedData/topicPartitionSerializedfile
differ


Mime
View raw message