kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3394; allow null offset metadata in commit API
Date Fri, 18 Mar 2016 20:37:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5d0cd7667 -> d9bf55171


KAFKA-3394; allow null offset metadata in commit API

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Grant Henke <granthenke@gmail.com>,
Ewen Cheslack-Postava <me@ewencp.org>, Jun Rao <junrao@gmail.com>

Closes #1064 from hachikuji/KAFKA-3394


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9bf5517
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9bf5517
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9bf5517

Branch: refs/heads/trunk
Commit: d9bf55171ba41962edc05cdf106f540cf94a2c3e
Parents: 5d0cd76
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Mar 18 13:37:33 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Mar 18 13:37:33 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/OffsetAndMetadata.java     |  3 ---
 .../apache/kafka/common/protocol/Protocol.java  |  8 ++++----
 .../common/requests/OffsetCommitResponse.java   |  5 +++++
 .../common/requests/RequestResponseTest.java    | 21 ++++++++++++++++----
 .../src/main/scala/kafka/server/KafkaApis.scala |  3 ++-
 .../kafka/api/PlaintextConsumerTest.scala       |  6 +++++-
 6 files changed, 33 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9bf5517/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index 66b257d..df8bf37 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -29,9 +29,6 @@ public class OffsetAndMetadata implements Serializable {
      * @param metadata Non-null metadata
      */
     public OffsetAndMetadata(long offset, String metadata) {
-        if (metadata == null)
-            throw new IllegalArgumentException("Metadata cannot be null");
-
         this.offset = offset;
         this.metadata = metadata;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9bf5517/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index e32d0b6..43110b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -178,7 +178,7 @@ public class Protocol {
                                                                                         
INT64,
                                                                                         
"Message offset to be committed."),
                                                                                new Field("metadata",
-                                                                                        
STRING,
+                                                                                        
NULLABLE_STRING,
                                                                                         
"Any associated metadata the client wants to keep."));
 
     public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
@@ -191,7 +191,7 @@ public class Protocol {
                                                                                         
INT64,
                                                                                         
"Timestamp of the commit"),
                                                                                new Field("metadata",
-                                                                                        
STRING,
+                                                                                        
NULLABLE_STRING,
                                                                                         
"Any associated metadata the client wants to keep."));
 
     public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition",
@@ -201,7 +201,7 @@ public class Protocol {
                                                                                         
INT64,
                                                                                         
"Message offset to be committed."),
                                                                                new Field("metadata",
-                                                                                        
STRING,
+                                                                                        
NULLABLE_STRING,
                                                                                         
"Any associated metadata the client wants to keep."));
 
     public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
@@ -314,7 +314,7 @@ public class Protocol {
                                                                                         
INT64,
                                                                                         
"Last committed message offset."),
                                                                                new Field("metadata",
-                                                                                        
STRING,
+                                                                                        
NULLABLE_STRING,
                                                                                         
"Any associated metadata the client wants to keep."),
                                                                                new Field("error_code",
INT16));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9bf5517/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 9b53fb4..71dd490 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -97,6 +97,11 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
         return responseData;
     }
 
+    public static OffsetCommitResponse parse(ByteBuffer buffer, int version) {
+        Schema schema = ProtoUtils.responseSchema(ApiKeys.OFFSET_COMMIT.id, version);
+        return new OffsetCommitResponse(schema.read(buffer));
+    }
+
     public static OffsetCommitResponse parse(ByteBuffer buffer) {
         return new OffsetCommitResponse(CURRENT_SCHEMA.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9bf5517/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b556b46..b7f0caf 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -74,8 +74,8 @@ public class RequestResponseTest {
                 createMetadataRequest(),
                 createMetadataRequest().getErrorResponse(0, new UnknownServerException()),
                 createMetadataResponse(),
-                createOffsetCommitRequest(),
-                createOffsetCommitRequest().getErrorResponse(0, new UnknownServerException()),
+                createOffsetCommitRequest(2),
+                createOffsetCommitRequest(2).getErrorResponse(2, new UnknownServerException()),
                 createOffsetCommitResponse(),
                 createOffsetFetchRequest(),
                 createOffsetFetchRequest().getErrorResponse(0, new UnknownServerException()),
@@ -98,6 +98,10 @@ public class RequestResponseTest {
         for (AbstractRequestResponse req : requestResponseList)
             checkSerialization(req, null);
 
+        checkSerialization(createOffsetCommitRequest(0), 0);
+        checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, new UnknownServerException()),
0);
+        checkSerialization(createOffsetCommitRequest(1), 1);
+        checkSerialization(createOffsetCommitRequest(1).getErrorResponse(1, new UnknownServerException()),
1);
         checkSerialization(createUpdateMetadataRequest(0, null), 0);
         checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(0, new UnknownServerException()),
0);
         checkSerialization(createUpdateMetadataRequest(1, null), 1);
@@ -292,10 +296,18 @@ public class RequestResponseTest {
         return new MetadataResponse(Arrays.asList(node), allTopicMetadata);
     }
 
-    private AbstractRequest createOffsetCommitRequest() {
+    private AbstractRequest createOffsetCommitRequest(int version) {
         Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
         commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100,
""));
-        return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData);
+        commitData.put(new TopicPartition("test", 1), new OffsetCommitRequest.PartitionData(200,
null));
+        if (version == 0) {
+            return new OffsetCommitRequest("group1", commitData);
+        } else if (version == 1) {
+            return new OffsetCommitRequest("group1", 100, "consumer1", commitData);
+        } else if (version == 2) {
+            return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData);
+        }
+        throw new IllegalArgumentException("Unknown offset commit request version " + version);
     }
 
     private AbstractRequestResponse createOffsetCommitResponse() {
@@ -311,6 +323,7 @@ public class RequestResponseTest {
     private AbstractRequestResponse createOffsetFetchResponse() {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L,
"", Errors.NONE.code()));
+        responseData.put(new TopicPartition("test", 1), new OffsetFetchResponse.PartitionData(100L,
null, Errors.NONE.code()));
         return new OffsetFetchResponse(responseData);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9bf5517/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0fb4d74..4f77d30 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -294,8 +294,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         val currentTimestamp = SystemTime.milliseconds
         val defaultExpireTimestamp = offsetRetention + currentTimestamp
         val partitionData = authorizedRequestInfo.mapValues { partitionData =>
+          val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else
partitionData.metadata;
           new OffsetAndMetadata(
-            offsetMetadata = OffsetMetadata(partitionData.offset, partitionData.metadata),
+            offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
             commitTimestamp = currentTimestamp,
             expireTimestamp = {
               if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9bf5517/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 8014479..ca0497b 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -224,8 +224,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val callback = new CountConsumerCommitCallback
     this.consumers(0).commitAsync(Map((tp, asyncMetadata)).asJava, callback)
     awaitCommitCallback(this.consumers(0), callback)
-
     assertEquals(asyncMetadata, this.consumers(0).committed(tp))
+
+    // handle null metadata
+    val nullMetadata = new OffsetAndMetadata(5, null)
+    this.consumers(0).commitSync(Map((tp, nullMetadata)).asJava)
+    assertEquals(nullMetadata, this.consumers(0).committed(tp))
   }
 
   @Test


Mime
View raw message