kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7903: automatically generate OffsetCommitRequest (#6583)
Date Thu, 25 Apr 2019 23:43:09 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0c2d829  KAFKA-7903: automatically generate OffsetCommitRequest (#6583)
0c2d829 is described below

commit 0c2d829249567c6a87140ae25f631a2abad11d00
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Thu Apr 25 16:42:54 2019 -0700

    KAFKA-7903: automatically generate OffsetCommitRequest (#6583)
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>
---
 .../consumer/internals/ConsumerCoordinator.java    | 121 +++++---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../kafka/common/requests/OffsetCommitRequest.java | 339 ++++-----------------
 .../common/requests/OffsetCommitResponse.java      | 143 ++++-----
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   5 +-
 .../internals/ConsumerCoordinatorTest.java         |  48 ++-
 .../kafka/common/requests/RequestResponseTest.java |  44 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  66 ++--
 .../kafka/api/AuthorizerIntegrationTest.scala      |  25 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  25 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  34 ++-
 11 files changed, 366 insertions(+), 490 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 68e98ba..4d40070 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -42,6 +44,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -770,14 +773,27 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             return RequestFuture.coordinatorNotAvailable();
 
         // create the offset commit request
-        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
+        Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
         for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
             OffsetAndMetadata offsetAndMetadata = entry.getValue();
             if (offsetAndMetadata.offset() < 0) {
                 return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
             }
-            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(offsetAndMetadata.offset(),
-                    offsetAndMetadata.leaderEpoch(), offsetAndMetadata.metadata()));
+
+            OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
+                    .getOrDefault(topicPartition.topic(),
+                            new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                                    .setName(topicPartition.topic())
+                    );
+
+            topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                    .setPartitionIndex(topicPartition.partition())
+                    .setCommittedOffset(offsetAndMetadata.offset())
+                    .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                    .setCommittedMetadata(offsetAndMetadata.metadata())
+            );
+            requestTopicDataMap.put(topicPartition.topic(), topic);
         }
 
         final Generation generation;
@@ -791,9 +807,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (generation == null)
             return RequestFuture.failure(new CommitFailedException());
 
-        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
-                setGenerationId(generation.generationId).
-                setMemberId(generation.memberId);
+        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
+                new OffsetCommitRequestData()
+                        .setGroupId(this.groupId)
+                        .setGenerationId(generation.generationId)
+                        .setMemberId(generation.memberId)
+                        .setTopics(new ArrayList<>(requestTopicDataMap.values()))
+        );
 
         log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);
 
@@ -814,52 +834,55 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             sensors.commitLatency.record(response.requestLatencyMs());
             Set<String> unauthorizedTopics = new HashSet<>();
 
-            for (Map.Entry<TopicPartition, Errors> entry : commitResponse.responseData().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
-                long offset = offsetAndMetadata.offset();
+            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
+                for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
+                    TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
+                    OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
 
-                Errors error = entry.getValue();
-                if (error == Errors.NONE) {
-                    log.debug("Committed offset {} for partition {}", offset, tp);
-                } else {
-                    if (error.exception() instanceof RetriableException) {
-                        log.warn("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
-                    } else {
-                        log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
-                    }
+                    long offset = offsetAndMetadata.offset();
 
-                    if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                        future.raise(new GroupAuthorizationException(groupId));
-                        return;
-                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                        unauthorizedTopics.add(tp.topic());
-                    } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
-                            || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
-                        // raise the error to the user
-                        future.raise(error);
-                        return;
-                    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
-                            || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                        // just retry
-                        future.raise(error);
-                        return;
-                    } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
-                            || error == Errors.NOT_COORDINATOR
-                            || error == Errors.REQUEST_TIMED_OUT) {
-                        markCoordinatorUnknown();
-                        future.raise(error);
-                        return;
-                    } else if (error == Errors.UNKNOWN_MEMBER_ID
-                            || error == Errors.ILLEGAL_GENERATION
-                            || error == Errors.REBALANCE_IN_PROGRESS) {
-                        // need to re-join group
-                        resetGeneration();
-                        future.raise(new CommitFailedException());
-                        return;
+                    Errors error = Errors.forCode(partition.errorCode());
+                    if (error == Errors.NONE) {
+                        log.debug("Committed offset {} for partition {}", offset, tp);
                     } else {
-                        future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
-                        return;
+                        if (error.exception() instanceof RetriableException) {
+                            log.warn("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
+                        } else {
+                            log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
+                        }
+
+                        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                            future.raise(new GroupAuthorizationException(groupId));
+                            return;
+                        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                            unauthorizedTopics.add(tp.topic());
+                        } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
+                                || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
+                            // raise the error to the user
+                            future.raise(error);
+                            return;
+                        } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
+                                || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                            // just retry
+                            future.raise(error);
+                            return;
+                        } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                                || error == Errors.NOT_COORDINATOR
+                                || error == Errors.REQUEST_TIMED_OUT) {
+                            markCoordinatorUnknown();
+                            future.raise(error);
+                            return;
+                        } else if (error == Errors.UNKNOWN_MEMBER_ID
+                                || error == Errors.ILLEGAL_GENERATION
+                                || error == Errors.REBALANCE_IN_PROGRESS) {
+                            // need to re-join group
+                            resetGeneration();
+                            future.raise(new CommitFailedException());
+                            return;
+                        } else {
+                            future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
+                            return;
+                        }
                     }
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 5391135..8109182 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -91,8 +93,6 @@ import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
@@ -133,7 +133,7 @@ public enum ApiKeys {
             UpdateMetadataResponse.schemaVersions()),
     CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, ControlledShutdownRequestData.SCHEMAS,
             ControlledShutdownResponseData.SCHEMAS),
-    OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequest.schemaVersions(), OffsetCommitResponse.schemaVersions()),
+    OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS),
     OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()),
     FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(),
             FindCoordinatorResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index fdb5b10..dd4cf78 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -17,126 +17,19 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_LEADER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_METADATA;
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_OFFSET;
-import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 
 public class OffsetCommitRequest extends AbstractRequest {
-    // top level fields
-    private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
-            "Topics to commit offsets");
-
-    // topic level fields
-    private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
-            "Partitions to commit offsets");
-
-    // partition level fields
-    @Deprecated
-    private static final Field.Int64 COMMIT_TIMESTAMP = new Field.Int64("timestamp", "Timestamp of the commit");
-    private static final Field.Int64 RETENTION_TIME = new Field.Int64("retention_time",
-            "Time period in ms to retain the offset.");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            COMMITTED_OFFSET,
-            COMMITTED_METADATA);
-
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-
-    private static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(
-            GROUP_ID,
-            TOPICS_V0);
-
-    // V1 adds timestamp and group membership information (generation and memberId)
-    private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
-            PARTITION_ID,
-            COMMITTED_OFFSET,
-            COMMIT_TIMESTAMP,
-            COMMITTED_METADATA);
-
-    private static final Field TOPICS_V1 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V1);
-
-    private static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(
-            GROUP_ID,
-            GENERATION_ID,
-            MEMBER_ID,
-            TOPICS_V1);
-
-    // V2 adds retention time
-    private static final Field PARTITIONS_V2 = PARTITIONS.withFields(
-            PARTITION_ID,
-            COMMITTED_OFFSET,
-            COMMITTED_METADATA);
-
-    private static final Field TOPICS_V2 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V2);
-
-    private static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(
-            GROUP_ID,
-            GENERATION_ID,
-            MEMBER_ID,
-            RETENTION_TIME,
-            TOPICS_V2);
-
-    // V3 adds throttle time
-    private static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
-
-    // V4 bump used to indicate that on quota violation brokers send out responses before throttling.
-    private static final Schema OFFSET_COMMIT_REQUEST_V4 = OFFSET_COMMIT_REQUEST_V3;
-
-    // V5 removes the retention time which is now controlled only by a broker configuration
-    private static final Schema OFFSET_COMMIT_REQUEST_V5 = new Schema(
-            GROUP_ID,
-            GENERATION_ID,
-            MEMBER_ID,
-            TOPICS_V2);
-
-    // V6 adds the leader epoch to the partition data
-    private static final Field PARTITIONS_V6 = PARTITIONS.withFields(
-            PARTITION_ID,
-            COMMITTED_OFFSET,
-            COMMITTED_LEADER_EPOCH,
-            COMMITTED_METADATA);
-
-    private static final Field TOPICS_V6 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V6);
-
-    private static final Schema OFFSET_COMMIT_REQUEST_V6 = new Schema(
-            GROUP_ID,
-            GENERATION_ID,
-            MEMBER_ID,
-            TOPICS_V6);
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2,
-            OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4, OFFSET_COMMIT_REQUEST_V5, OFFSET_COMMIT_REQUEST_V6};
-    }
-
     // default values for the current version
     public static final int DEFAULT_GENERATION_ID = -1;
     public static final String DEFAULT_MEMBER_ID = "";
@@ -147,221 +40,115 @@ public class OffsetCommitRequest extends AbstractRequest {
     @Deprecated
     public static final long DEFAULT_TIMESTAMP = -1L;            // for V0, V1
 
-    private final String groupId;
-    private final String memberId;
-    private final int generationId;
-    private final long retentionTime;
-    private final Map<TopicPartition, PartitionData> offsetData;
-
-    public static final class PartitionData {
-        @Deprecated
-        public final long timestamp;                // for V1
-
-        public final long offset;
-        public final String metadata;
-        public final Optional<Integer> leaderEpoch;
-
-        private PartitionData(long offset, Optional<Integer> leaderEpoch, long timestamp, String metadata) {
-            this.offset = offset;
-            this.leaderEpoch = leaderEpoch;
-            this.timestamp = timestamp;
-            this.metadata = metadata;
-        }
-
-        @Deprecated
-        public PartitionData(long offset, long timestamp, String metadata) {
-            this(offset, Optional.empty(), timestamp, metadata);
-        }
-
-        public PartitionData(long offset, Optional<Integer> leaderEpoch, String metadata) {
-            this(offset, leaderEpoch, DEFAULT_TIMESTAMP, metadata);
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(timestamp=").append(timestamp).
-                    append(", offset=").append(offset).
-                    append(", leaderEpoch=").append(leaderEpoch).
-                    append(", metadata=").append(metadata).
-                    append(")");
-            return bld.toString();
-        }
-    }
+    private final OffsetCommitRequestData data;
 
     public static class Builder extends AbstractRequest.Builder<OffsetCommitRequest> {
-        private final String groupId;
-        private final Map<TopicPartition, PartitionData> offsetData;
-        private String memberId = DEFAULT_MEMBER_ID;
-        private int generationId = DEFAULT_GENERATION_ID;
 
-        public Builder(String groupId, Map<TopicPartition, PartitionData> offsetData) {
-            super(ApiKeys.OFFSET_COMMIT);
-            this.groupId = groupId;
-            this.offsetData = offsetData;
-        }
-
-        public Builder setMemberId(String memberId) {
-            this.memberId = memberId;
-            return this;
-        }
+        private final OffsetCommitRequestData data;
 
-        public Builder setGenerationId(int generationId) {
-            this.generationId = generationId;
-            return this;
+        public Builder(OffsetCommitRequestData data) {
+            super(ApiKeys.OFFSET_COMMIT);
+            this.data = data;
         }
 
         @Override
         public OffsetCommitRequest build(short version) {
-            if (version == 0) {
-                return new OffsetCommitRequest(groupId, DEFAULT_GENERATION_ID, DEFAULT_MEMBER_ID,
-                        DEFAULT_RETENTION_TIME, offsetData, version);
-            } else {
-                return new OffsetCommitRequest(groupId, generationId, memberId, DEFAULT_RETENTION_TIME,
-                        offsetData, version);
-            }
+            return new OffsetCommitRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=OffsetCommitRequest").
-                append(", groupId=").append(groupId).
-                append(", memberId=").append(memberId).
-                append(", generationId=").append(generationId).
-                append(", offsetData=").append(offsetData).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private OffsetCommitRequest(String groupId, int generationId, String memberId, long retentionTime,
-                                Map<TopicPartition, PartitionData> offsetData, short version) {
+    private final short version;
+
+    public OffsetCommitRequest(OffsetCommitRequestData data, short version) {
         super(ApiKeys.OFFSET_COMMIT, version);
-        this.groupId = groupId;
-        this.generationId = generationId;
-        this.memberId = memberId;
-        this.retentionTime = retentionTime;
-        this.offsetData = offsetData;
+        this.data = data;
+        this.version = version;
     }
 
-    public OffsetCommitRequest(Struct struct, short versionId) {
-        super(ApiKeys.OFFSET_COMMIT, versionId);
-
-        groupId = struct.get(GROUP_ID);
 
-        // These fields only exists in v1.
-        generationId = struct.getOrElse(GENERATION_ID, DEFAULT_GENERATION_ID);
-        memberId = struct.getOrElse(MEMBER_ID, DEFAULT_MEMBER_ID);
+    public OffsetCommitRequest(Struct struct, short version) {
+        super(ApiKeys.OFFSET_COMMIT, version);
+        this.data = new OffsetCommitRequestData(struct, version);
+        this.version = version;
+    }
 
-        // This field only exists in v2
-        retentionTime = struct.getOrElse(RETENTION_TIME, DEFAULT_RETENTION_TIME);
+    public OffsetCommitRequestData data() {
+        return data;
+    }
 
-        offsetData = new HashMap<>();
-        for (Object topicDataObj : struct.get(TOPICS)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.get(TOPIC_NAME);
-            for (Object partitionDataObj : topicData.get(PARTITIONS)) {
-                Struct partitionDataStruct = (Struct) partitionDataObj;
-                int partition = partitionDataStruct.get(PARTITION_ID);
-                long offset = partitionDataStruct.get(COMMITTED_OFFSET);
-                String metadata = partitionDataStruct.get(COMMITTED_METADATA);
-                PartitionData partitionOffset;
-                // This field only exists in v1
-                if (partitionDataStruct.hasField(COMMIT_TIMESTAMP)) {
-                    long timestamp = partitionDataStruct.get(COMMIT_TIMESTAMP);
-                    partitionOffset = new PartitionData(offset, timestamp, metadata);
-                } else {
-                    Optional<Integer> leaderEpochOpt = RequestUtils.getLeaderEpoch(partitionDataStruct,
-                            COMMITTED_LEADER_EPOCH);
-                    partitionOffset = new PartitionData(offset, leaderEpochOpt, metadata);
-                }
-                offsetData.put(new TopicPartition(topic, partition), partitionOffset);
+    public Map<TopicPartition, Long> offsets() {
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        for (OffsetCommitRequestData.OffsetCommitRequestTopic topic : data.topics()) {
+            for (OffsetCommitRequestData.OffsetCommitRequestPartition partition : topic.partitions()) {
+                offsets.put(new TopicPartition(topic.name(), partition.partitionIndex()),
+                        partition.committedOffset());
             }
         }
+        return offsets;
     }
 
-    @Override
-    public Struct toStruct() {
-        short version = version();
-        Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.requestSchema(version));
-        struct.set(GROUP_ID, groupId);
-
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupPartitionDataByTopic(offsetData);
-        List<Struct> topicArray = new ArrayList<>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS);
-            topicData.set(TOPIC_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS);
-                partitionData.set(PARTITION_ID, partitionEntry.getKey());
-                partitionData.set(COMMITTED_OFFSET, fetchPartitionData.offset);
-                // Only for v1
-                partitionData.setIfExists(COMMIT_TIMESTAMP, fetchPartitionData.timestamp);
-                // Only for v6
-                RequestUtils.setLeaderEpochIfExists(partitionData, COMMITTED_LEADER_EPOCH, fetchPartitionData.leaderEpoch);
-                partitionData.set(COMMITTED_METADATA, fetchPartitionData.metadata);
-                partitionArray.add(partitionData);
+    public static List<OffsetCommitResponseData.OffsetCommitResponseTopic> getErrorResponseTopics(
+            List<OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopics,
+            Errors e) {
+        List<OffsetCommitResponseData.OffsetCommitResponseTopic>
+                responseTopicData = new ArrayList<>();
+        for (OffsetCommitRequestData.OffsetCommitRequestTopic entry : requestTopics) {
+            List<OffsetCommitResponseData.OffsetCommitResponsePartition> responsePartitions =
+                    new ArrayList<>();
+            for (OffsetCommitRequestData.OffsetCommitRequestPartition requestPartition : entry.partitions()) {
+                responsePartitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+                        .setPartitionIndex(requestPartition.partitionIndex())
+                        .setErrorCode(e.code()));
             }
-            topicData.set(PARTITIONS, partitionArray.toArray());
-            topicArray.add(topicData);
+            responseTopicData.add(new OffsetCommitResponseData.OffsetCommitResponseTopic()
+                    .setName(entry.name())
+                    .setPartitions(responsePartitions)
+            );
         }
-        struct.set(TOPICS, topicArray.toArray());
-        struct.setIfExists(GENERATION_ID, generationId);
-        struct.setIfExists(MEMBER_ID, memberId);
-        struct.setIfExists(RETENTION_TIME, retentionTime);
-        return struct;
+        return responseTopicData;
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<TopicPartition, Errors> responseData = new HashMap<>();
-        for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
-            responseData.put(entry.getKey(), Errors.forException(e));
-        }
+        List<OffsetCommitResponseData.OffsetCommitResponseTopic>
+                responseTopicData = getErrorResponseTopics(data.topics(), Errors.forException(e));
 
         short versionId = version();
         switch (versionId) {
             case 0:
             case 1:
             case 2:
-                return new OffsetCommitResponse(responseData);
+                return new OffsetCommitResponse(
+                        new OffsetCommitResponseData()
+                                .setTopics(responseTopicData)
+                );
             case 3:
             case 4:
             case 5:
             case 6:
-                return new OffsetCommitResponse(throttleTimeMs, responseData);
+                return new OffsetCommitResponse(
+                        new OffsetCommitResponseData()
+                                .setTopics(responseTopicData)
+                                .setThrottleTimeMs(throttleTimeMs)
+
+                );
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_COMMIT.latestVersion()));
         }
     }
 
-    public String groupId() {
-        return groupId;
-    }
-
-    public int generationId() {
-        return generationId;
-    }
-
-    public String memberId() {
-        return memberId;
-    }
-
-    @Deprecated
-    public long retentionTime() {
-        return retentionTime;
-    }
-
-    public Map<TopicPartition, PartitionData> offsetData() {
-        return offsetData;
+    public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
+        return new OffsetCommitRequest(ApiKeys.OFFSET_COMMIT.parseRequest(version, buffer), version);
     }
 
-    public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
-        Schema schema = ApiKeys.OFFSET_COMMIT.requestSchema(version);
-        return new OffsetCommitRequest(schema.read(buffer), version);
+    @Override
+    protected Struct toStruct() {
+        return data.toStruct(version);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 4d724a3..7f2603e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -17,24 +17,16 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-
 /**
  * Possible error codes:
  *
@@ -52,117 +44,86 @@ import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
  * GROUP_AUTHORIZATION_FAILED (30)
  */
 public class OffsetCommitResponse extends AbstractResponse {
-    private static final Field.ComplexArray TOPICS = new Field.ComplexArray("responses",
-            "Responses by topic for committed partitions");
-
-    // topic level fields
-    private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partition_responses",
-            "Responses for committed partitions");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            ERROR_CODE);
-
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
 
-    private static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(
-            TOPICS_V0);
+    private final OffsetCommitResponseData data;
 
-    // V1 adds timestamp and group membership information (generation and memberId) to the request
-    private static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
-
-    // V2 adds retention time to the request
-    private static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
+    public OffsetCommitResponse(OffsetCommitResponseData data) {
+        this.data = data;
+    }
 
-    // V3 adds throttle time
-    private static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema(
-            THROTTLE_TIME_MS,
-            TOPICS_V0);
+    public OffsetCommitResponse(int requestThrottleMs, Map<TopicPartition, Errors> responseData) {
+        Map<String, OffsetCommitResponseData.OffsetCommitResponseTopic>
+                responseTopicDataMap = new HashMap<>();
 
-    // V4 bump used to indicate that on quota violation brokers send out responses before throttling.
-    private static final Schema OFFSET_COMMIT_RESPONSE_V4 = OFFSET_COMMIT_RESPONSE_V3;
+        for (Map.Entry<TopicPartition, Errors> entry : responseData.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            String topicName = topicPartition.topic();
 
-    // V5 removes retention time from the request
-    private static final Schema OFFSET_COMMIT_RESPONSE_V5 = OFFSET_COMMIT_RESPONSE_V4;
+            OffsetCommitResponseData.OffsetCommitResponseTopic topic = responseTopicDataMap
+                    .getOrDefault(topicName, new OffsetCommitResponseData.OffsetCommitResponseTopic());
 
-    // V6 adds leader epoch to the request
-    private static final Schema OFFSET_COMMIT_RESPONSE_V6 = OFFSET_COMMIT_RESPONSE_V5;
+            if (topic.name().equals("")) {
+                topic.setName(topicName);
+            }
+            topic.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+                    .setErrorCode(entry.getValue().code())
+                    .setPartitionIndex(topicPartition.partition())
+            );
+            responseTopicDataMap.put(topicName, topic);
+        }
 
-    public static Schema[] schemaVersions() {
-        return new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2,
-            OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4, OFFSET_COMMIT_RESPONSE_V5, OFFSET_COMMIT_RESPONSE_V6};
+        data = new OffsetCommitResponseData()
+                .setTopics(new ArrayList<>(responseTopicDataMap.values()))
+                .setThrottleTimeMs(requestThrottleMs);
     }
 
-    private final Map<TopicPartition, Errors> responseData;
-    private final int throttleTimeMs;
-
     public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) {
         this(DEFAULT_THROTTLE_TIME, responseData);
     }
 
-    public OffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> responseData) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.responseData = responseData;
+    public OffsetCommitResponse(Struct struct) {
+        short latestVersion = (short) (OffsetCommitResponseData.SCHEMAS.length - 1);
+        this.data = new OffsetCommitResponseData(struct, latestVersion);
     }
 
-    public OffsetCommitResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        responseData = new HashMap<>();
-        for (Object topicResponseObj : struct.get(TOPICS)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE));
-                responseData.put(new TopicPartition(topic, partition), error);
-            }
-        }
+    public OffsetCommitResponse(Struct struct, short version) {
+        this.data = new OffsetCommitResponseData(struct, version);
+    }
+
+    public OffsetCommitResponseData data() {
+        return data;
     }
 
     @Override
-    public Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
-        Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupPartitionDataByTopic(responseData);
-        List<Struct> topicArray = new ArrayList<>();
-        for (Map.Entry<String, Map<Integer, Errors>> entries: topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS);
-            topicData.set(TOPIC_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, Errors> partitionEntry : entries.getValue().entrySet()) {
-                Struct partitionData = topicData.instance(PARTITIONS);
-                partitionData.set(PARTITION_ID, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE, partitionEntry.getValue().code());
-                partitionArray.add(partitionData);
+    public Map<Errors, Integer> errorCounts() {
+        Map<TopicPartition, Errors> errorMap = new HashMap<>();
+        for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : data.topics()) {
+            for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
+                errorMap.put(new TopicPartition(topic.name(), partition.partitionIndex()),
+                        Errors.forCode(partition.errorCode()));
             }
-            topicData.set(PARTITIONS, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS, topicArray.toArray());
 
-        return struct;
+        }
+        return errorCounts(errorMap);
     }
 
-    @Override
-    public int throttleTimeMs() {
-        return throttleTimeMs;
+    public static OffsetCommitResponse parse(ByteBuffer buffer, short version) {
+        return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer), version);
     }
 
-    public Map<TopicPartition, Errors> responseData() {
-        return responseData;
+    @Override
+    public Struct toStruct(short version) {
+        return data.toStruct(version);
     }
 
     @Override
-    public Map<Errors, Integer> errorCounts() {
-        return errorCounts(responseData);
+    public String toString() {
+        return data.toString();
     }
 
-    public static OffsetCommitResponse parse(ByteBuffer buffer, short version) {
-        return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer));
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f31ca52..25c72c6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1724,10 +1724,11 @@ public class KafkaConsumerTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
+                Map<TopicPartition, Long> commitErrors = commitRequest.offsets();
+
                 for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
-                    OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partitionOffset.getKey());
                     // verify that the expected offset has been committed
-                    if (partitionData.offset != partitionOffset.getValue()) {
+                    if (!commitErrors.get(partitionOffset.getKey()).equals(partitionOffset.getValue())) {
                         commitReceived.set(false);
                         return false;
                     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3cdbbfa..645e6ed 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -40,8 +40,11 @@ import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -249,10 +252,24 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
-        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = singletonMap(
-                new TopicPartition("foo", 0), new OffsetCommitRequest.PartitionData(13L,
-                        Optional.empty(), ""));
-        consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(groupId, offsets))
+
+        OffsetCommitRequestData offsetCommitRequestData = new OffsetCommitRequestData()
+                .setGroupId(groupId)
+                .setTopics(Collections.singletonList(new
+                        OffsetCommitRequestData.OffsetCommitRequestTopic()
+                                .setName("foo")
+                                .setPartitions(Collections.singletonList(
+                                        new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                                                .setPartitionIndex(0)
+                                                .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                                                .setCommittedMetadata("")
+                                                .setCommittedOffset(13L)
+                                                .setCommitTimestamp(0)
+                                ))
+                        )
+                );
+
+        consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(offsetCommitRequestData))
                 .compose(new RequestFutureAdapter<ClientResponse, Object>() {
                     @Override
                     public void onSuccess(ClientResponse value, RequestFuture<Object> future) {}
@@ -1495,8 +1512,8 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
-                return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
-                        commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
+                return commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
+                        commitRequest.data().generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
             }
         }, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
 
@@ -2127,9 +2144,9 @@ public class ConsumerCoordinatorTest {
             public boolean matches(AbstractRequest body) {
                 commitRequested.set(true);
                 OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
-                return commitRequest.groupId().equals(groupId);
+                return commitRequest.data().groupId().equals(groupId);
             }
-        }, new OffsetCommitResponse(new HashMap<TopicPartition, Errors>()));
+        }, new OffsetCommitResponse(new OffsetCommitResponseData()));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -2291,19 +2308,20 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 OffsetCommitRequest req = (OffsetCommitRequest) body;
-                Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = req.offsetData();
+                Map<TopicPartition, Long> offsets = req.offsets();
                 if (offsets.size() != expectedOffsets.size())
                     return false;
 
                 for (Map.Entry<TopicPartition, Long> expectedOffset : expectedOffsets.entrySet()) {
-                    if (!offsets.containsKey(expectedOffset.getKey()))
-                        return false;
-
-                    OffsetCommitRequest.PartitionData offsetCommitData = offsets.get(expectedOffset.getKey());
-                    if (offsetCommitData.offset != expectedOffset.getValue())
+                    if (!offsets.containsKey(expectedOffset.getKey())) {
                         return false;
+                    } else {
+                        Long actualOffset = offsets.get(expectedOffset.getKey());
+                        if (!actualOffset.equals(expectedOffset.getValue())) {
+                            return false;
+                        }
+                    }
                 }
-
                 return true;
             }
         };
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e595195..b6a2dad 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -58,6 +58,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -938,21 +940,41 @@ public class RequestResponseTest {
 
     @SuppressWarnings("deprecation")
     private OffsetCommitRequest createOffsetCommitRequest(int version) {
-        Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
-        commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100,
-                RecordBatch.NO_PARTITION_LEADER_EPOCH, ""));
-        commitData.put(new TopicPartition("test", 1), new OffsetCommitRequest.PartitionData(200,
-                RecordBatch.NO_PARTITION_LEADER_EPOCH, null));
-        return new OffsetCommitRequest.Builder("group1", commitData)
-                .setGenerationId(100)
+        return new OffsetCommitRequest.Builder(new OffsetCommitRequestData()
+                .setGroupId("group1")
                 .setMemberId("consumer1")
-                .build((short) version);
+                .setGenerationId(100)
+                .setTopics(Collections.singletonList(
+                        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                                .setName("test")
+                                .setPartitions(Arrays.asList(
+                                        new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                                                .setPartitionIndex(0)
+                                                .setCommittedOffset(100)
+                                                .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                                                .setCommittedMetadata(""),
+                                        new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                                                .setPartitionIndex(1)
+                                                .setCommittedOffset(200)
+                                                .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                                                .setCommittedMetadata(null)
+                                ))
+                ))
+        ).build((short) version);
     }
 
     private OffsetCommitResponse createOffsetCommitResponse() {
-        Map<TopicPartition, Errors> responseData = new HashMap<>();
-        responseData.put(new TopicPartition("test", 0), Errors.NONE);
-        return new OffsetCommitResponse(responseData);
+        return new OffsetCommitResponse(new OffsetCommitResponseData()
+                .setTopics(Collections.singletonList(
+                        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+                                .setName("test")
+                                .setPartitions(Collections.singletonList(
+                                        new OffsetCommitResponseData.OffsetCommitResponsePartition()
+                                                .setPartitionIndex(0)
+                                                .setErrorCode(Errors.NONE.code())
+                                ))
+                ))
+        );
     }
 
     private OffsetFetchRequest createOffsetFetchRequest(int version) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 56adc9b..aa368f8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
-import org.apache.kafka.common.message.{CreateTopicsResponseData, DeleteTopicsResponseData, DescribeGroupsResponseData, ElectPreferredLeadersResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData}
+import org.apache.kafka.common.message._
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
@@ -298,25 +298,33 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
     // reject the request if not authorized to the group
-    if (!authorize(request.session, Read, Resource(Group, offsetCommitRequest.groupId, LITERAL))) {
+    if (!authorize(request.session, Read, Resource(Group, offsetCommitRequest.data().groupId, LITERAL))) {
       val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
-        (topicPartition, error)
-      }.toMap
-      sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
+      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
+        offsetCommitRequest.data().topics(),
+        error)
+
+      sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
+        new OffsetCommitResponseData()
+            .setTopics(responseTopicList)
+            .setThrottleTimeMs(requestThrottleMs)
+      ))
     } else {
 
       val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
       val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequest.PartitionData]
-
-      for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) {
-        if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
-          unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-        else if (!metadataCache.contains(topicPartition))
-          nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-        else
-          authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+
+      for (topicData <- offsetCommitRequest.data().topics().asScala) {
+        for (partitionData <- topicData.partitions().asScala) {
+          val topicPartition = new TopicPartition(topicData.name(), partitionData.partitionIndex())
+          if (!authorize(request.session, Read, Resource(Topic, topicData.name(), LITERAL)))
+            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
+          else if (!metadataCache.contains(topicPartition))
+            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          else
+            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+        }
       }
 
       val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
@@ -342,10 +350,14 @@ class KafkaApis(val requestChannel: RequestChannel,
         val responseInfo = authorizedTopicRequestInfo.map {
           case (topicPartition, partitionData) =>
             try {
-              if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
+              if (partitionData.committedMetadata() != null
+                && partitionData.committedMetadata().length > config.offsetMetadataMaxSize)
                 (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
               else {
-                zkClient.setOrCreateConsumerOffset(offsetCommitRequest.groupId, topicPartition, partitionData.offset)
+                zkClient.setOrCreateConsumerOffset(
+                  offsetCommitRequest.data().groupId(),
+                  topicPartition,
+                  partitionData.committedOffset())
                 (topicPartition, Errors.NONE)
               }
             } catch {
@@ -364,16 +376,20 @@ class KafkaApis(val requestChannel: RequestChannel,
         //   - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
         val currentTimestamp = time.milliseconds
         val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
-          val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata
+          val metadata = if (partitionData.committedMetadata() == null)
+            OffsetAndMetadata.NoMetadata
+          else
+            partitionData.committedMetadata()
+
           new OffsetAndMetadata(
-            offset = partitionData.offset,
-            leaderEpoch = partitionData.leaderEpoch,
+            offset = partitionData.committedOffset(),
+            leaderEpoch = Optional.ofNullable(new Integer(partitionData.committedLeaderEpoch())),
             metadata = metadata,
-            commitTimestamp = partitionData.timestamp match {
+            commitTimestamp = partitionData.commitTimestamp() match {
               case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
               case customTimestamp => customTimestamp
             },
-            expireTimestamp = offsetCommitRequest.retentionTime match {
+            expireTimestamp = offsetCommitRequest.data().retentionTimeMs() match {
               case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
               case retentionTime => Some(currentTimestamp + retentionTime)
             }
@@ -382,9 +398,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         // call coordinator to handle commit offset
         groupCoordinator.handleCommitOffsets(
-          offsetCommitRequest.groupId,
-          offsetCommitRequest.memberId,
-          offsetCommitRequest.generationId,
+          offsetCommitRequest.data().groupId(),
+          offsetCommitRequest.data().memberId(),
+          offsetCommitRequest.data().generationId(),
           partitionData,
           sendResponseCallback)
       }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0bc1045..23a0ad0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigSet}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, RecordBatch, SimpleRecord}
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.LITERAL
@@ -158,7 +158,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
     ApiKeys.FETCH -> ((resp: requests.FetchResponse[Records]) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
     ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
-    ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1 == tp).get._2),
+    ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => Errors.forCode(
+      resp.data().topics().get(0).partitions().get(0).errorCode())),
     ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
     ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
     ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error),
@@ -342,9 +343,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createOffsetCommitRequest = {
     new requests.OffsetCommitRequest.Builder(
-      group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0L, Optional.empty[Integer](), "metadata")).asJava).
-      setMemberId("").setGenerationId(1).
-      build()
+        new OffsetCommitRequestData()
+          .setGroupId(group)
+          .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+          .setGenerationId(1)
+          .setTopics(Collections.singletonList(
+            new OffsetCommitRequestData.OffsetCommitRequestTopic()
+              .setName(topic)
+              .setPartitions(Collections.singletonList(
+                new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                  .setPartitionIndex(part)
+                  .setCommittedOffset(0)
+                  .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                  .setCommitTimestamp(OffsetCommitRequest.DEFAULT_TIMESTAMP)
+                  .setCommittedMetadata("metadata")
+              )))
+          )
+    ).build()
   }
 
   private def createPartitionsRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f3eae1e..54316f3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -48,6 +48,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
+import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.junit.Assert.{assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
 
@@ -117,10 +118,21 @@ class KafkaApisTest {
     def checkInvalidPartition(invalidPartitionId: Int): Unit = {
       EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
 
-      val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
-      val partitionOffsetCommitData = new OffsetCommitRequest.PartitionData(15L, Optional.empty[Integer](), "")
-      val (offsetCommitRequest, request) = buildRequest(new OffsetCommitRequest.Builder("groupId",
-        Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
+      val (offsetCommitRequest, request) = buildRequest(new OffsetCommitRequest.Builder(
+        new OffsetCommitRequestData()
+          .setGroupId("groupId")
+          .setTopics(Collections.singletonList(
+            new OffsetCommitRequestData.OffsetCommitRequestTopic()
+              .setName(topic)
+              .setPartitions(Collections.singletonList(
+                new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                  .setPartitionIndex(invalidPartitionId)
+                  .setCommittedOffset(15)
+                  .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                  .setCommittedMetadata(""))
+              )
+          ))
+      ))
 
       val capturedResponse = expectNoThrottling()
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
@@ -128,7 +140,8 @@ class KafkaApisTest {
 
       val response = readResponse(ApiKeys.OFFSET_COMMIT, offsetCommitRequest, capturedResponse)
         .asInstanceOf[OffsetCommitResponse]
-      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData().get(invalidTopicPartition))
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+        Errors.forCode(response.data().topics().get(0).partitions().get(0).errorCode()))
     }
 
     checkInvalidPartition(-1)
@@ -571,7 +584,7 @@ class KafkaApisTest {
     val header = RequestHeader.parse(buffer)
     val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
       listenerName, SecurityProtocol.PLAINTEXT)
-    (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0, MemoryPool.NONE, buffer,
+    (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
       requestChannelMetrics))
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 710ed57..ae74fa1 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -27,7 +27,6 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.message.ControlledShutdownRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
@@ -250,10 +249,27 @@ class RequestQuotaTest extends BaseRequestTest {
               ApiKeys.CONTROLLED_SHUTDOWN.latestVersion)
 
         case ApiKeys.OFFSET_COMMIT =>
-          new OffsetCommitRequest.Builder("test-group",
-            Map(tp -> new OffsetCommitRequest.PartitionData(0, Optional.empty[Integer](), "metadata")).asJava).
-            setMemberId("").setGenerationId(1)
-
+          new OffsetCommitRequest.Builder(
+            new OffsetCommitRequestData()
+              .setGroupId("test-group")
+              .setGenerationId(1)
+              .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+              .setTopics(
+                Collections.singletonList(
+                  new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                    .setName(topic)
+                    .setPartitions(
+                      Collections.singletonList(
+                        new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                          .setPartitionIndex(0)
+                          .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                          .setCommittedOffset(0)
+                          .setCommittedMetadata("metadata")
+                      )
+                    )
+                )
+              )
+          )
         case ApiKeys.OFFSET_FETCH =>
           new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
 
@@ -469,7 +485,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
       case ApiKeys.METADATA =>
         new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
-      case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
+      case ApiKeys.OFFSET_COMMIT =>
+        new OffsetCommitResponse(response, ApiKeys.OFFSET_COMMIT.latestVersion).throttleTimeMs
       case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
       case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
       case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
@@ -516,7 +533,10 @@ class RequestQuotaTest extends BaseRequestTest {
     // Request until throttled using client-id with default small quota
     val clientId = apiKey.toString
     val client = Client(clientId, apiKey)
-    val throttled = client.runUntil(response => responseThrottleTime(apiKey, response) > 0)
+
+    val throttled = client.runUntil(response =>
+      responseThrottleTime(apiKey, response) > 0
+    )
 
     assertTrue(s"Response not throttled: $client", throttled)
     assertTrue(s"Throttle time metrics not updated: $client" , throttleTimeMetricValue(clientId) > 0)


Mime
View raw message