kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8640; Use generated classes in OffsetFetch request and response (#7062)
Date Tue, 30 Jul 2019 15:30:30 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2047108  KAFKA-8640; Use generated classes in OffsetFetch request and response (#7062)
2047108 is described below

commit 204710832eb4787b185264a0c8e5fe89a3db9d44
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Tue Jul 30 08:29:45 2019 -0700

    KAFKA-8640; Use generated classes in OffsetFetch request and response (#7062)
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../apache/kafka/common/protocol/CommonFields.java |   3 -
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../common/requests/OffsetCommitResponse.java      |   1 -
 .../kafka/common/requests/OffsetFetchRequest.java  | 194 +++++-------------
 .../kafka/common/requests/OffsetFetchResponse.java | 228 ++++++++-------------
 .../common/message/OffsetFetchRequest.json         |   5 +-
 .../common/message/OffsetFetchResponse.json        |   2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   4 +-
 .../apache/kafka/common/message/MessageTest.java   |  66 ++++++
 .../common/requests/OffsetFetchRequestTest.java    | 115 +++++++++++
 .../common/requests/OffsetFetchResponseTest.java   | 223 ++++++++++++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  12 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 15 files changed, 569 insertions(+), 296 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 18d8fd6..3f0f5e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -52,6 +52,8 @@ import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -99,8 +101,6 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.requests.LeaderAndIsrResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
-import org.apache.kafka.common.requests.OffsetFetchRequest;
-import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
@@ -138,7 +138,7 @@ public enum ApiKeys {
     CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, ControlledShutdownRequestData.SCHEMAS,
             ControlledShutdownResponseData.SCHEMAS),
     OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS),
-    OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()),
+    OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequestData.SCHEMAS, OffsetFetchResponseData.SCHEMAS),
     FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS,
         FindCoordinatorResponseData.SCHEMAS),
     JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index 5fdc37f..5da38aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -36,9 +36,6 @@ public class CommonFields {
 
     // Group APIs
     public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier");
-    public static final Field.Int32 GENERATION_ID = new Field.Int32("generation_id", "The generation of the group.");
-    public static final Field.Str MEMBER_ID = new Field.Str("member_id", "The member id assigned by the group " +
-            "coordinator or null if joining for the first time.");
 
     // Transactional APIs
     public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction.");
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 2e433e8..eb52fb8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -81,7 +81,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case OFFSET_COMMIT:
                 return new OffsetCommitResponse(struct, version);
             case OFFSET_FETCH:
-                return new OffsetFetchResponse(struct);
+                return new OffsetFetchResponse(struct, version);
             case FIND_COORDINATOR:
                 return new FindCoordinatorResponse(struct, version);
             case JOIN_GROUP:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 7f2603e..fee2e47 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -102,7 +102,6 @@ public class OffsetCommitResponse extends AbstractResponse {
                 errorMap.put(new TopicPartition(topic.name(), partition.partitionIndex()),
                         Errors.forCode(partition.errorCode()));
             }
-
         }
         return errorCounts(errorMap);
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index d2f5c88..f616ac1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -18,13 +18,11 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -33,84 +31,45 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-
 public class OffsetFetchRequest extends AbstractRequest {
-    // top level fields
-    private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
-            "Topics to fetch offsets. If the topic array is null fetch offsets for all topics.");
-
-    // topic level fields
-    private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
-            "Partitions to fetch offsets.");
-
-    /*
-     * Wire formats of version 0 and 1 are the same, but with different functionality.
-     * Wire format of version 2 is similar to version 1, with the exception of
-     * - accepting 'null' as list of topics
-     * - returning a top level error code
-     * Version 0 will read the offsets from ZK.
-     * Version 1 will read the offsets from Kafka.
-     * Version 2 will read the offsets from Kafka, and returns all associated topic partition offsets if
-     * a 'null' is passed instead of a list of specific topic partitions. It also returns a top level error code
-     * for group or coordinator level errors.
-     */
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID);
-
-    private static final Field TOPICS_V0 = TOPICS.withFields("Topics to fetch offsets.",
-            TOPIC_NAME,
-            PARTITIONS_V0);
-
-    private static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(
-            GROUP_ID,
-            TOPICS_V0);
-
-    // V1 begins support for fetching offsets from the internal __consumer_offsets topic
-    private static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
-
-    // V2 adds top-level error code to the response as well as allowing a null offset array to indicate fetch
-    // of all committed offsets for a group
-    private static final Field TOPICS_V2 = TOPICS.nullableWithFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-    private static final Schema OFFSET_FETCH_REQUEST_V2 = new Schema(
-            GROUP_ID,
-            TOPICS_V2);
-
-    // V3 request is the same as v2. Throttle time has been added to v3 response
-    private static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2;
-
-    // V4 bump used to indicate that on quota violation brokers send out responses before throttling.
-    private static final Schema OFFSET_FETCH_REQUEST_V4 = OFFSET_FETCH_REQUEST_V3;
-
-    // V5 adds the leader epoch of the committed offset in the response
-    private static final Schema OFFSET_FETCH_REQUEST_V5 = OFFSET_FETCH_REQUEST_V4;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2,
-            OFFSET_FETCH_REQUEST_V3, OFFSET_FETCH_REQUEST_V4, OFFSET_FETCH_REQUEST_V5};
-    }
+
+    private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS = null;
+    public final OffsetFetchRequestData data;
 
     public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> {
-        private static final List<TopicPartition> ALL_TOPIC_PARTITIONS = null;
-        private final String groupId;
-        private final List<TopicPartition> partitions;
+
+        public final OffsetFetchRequestData data;
 
         public Builder(String groupId, List<TopicPartition> partitions) {
             super(ApiKeys.OFFSET_FETCH);
-            this.groupId = groupId;
-            this.partitions = partitions;
+
+            final List<OffsetFetchRequestTopic> topics;
+            if (partitions != null) {
+                Map<String, OffsetFetchRequestTopic> offsetFetchRequestTopicMap = new HashMap<>();
+                for (TopicPartition topicPartition : partitions) {
+                    String topicName = topicPartition.topic();
+                    OffsetFetchRequestTopic topic = offsetFetchRequestTopicMap.getOrDefault(
+                        topicName, new OffsetFetchRequestTopic().setName(topicName));
+                    topic.partitionIndexes().add(topicPartition.partition());
+                    offsetFetchRequestTopicMap.put(topicName, topic);
+                }
+                topics = new ArrayList<>(offsetFetchRequestTopicMap.values());
+            } else {
+                // If passed in partition list is null, it is requesting offsets for all topic partitions.
+                topics = ALL_TOPIC_PARTITIONS;
+            }
+
+            this.data = new OffsetFetchRequestData()
+                            .setGroupId(groupId)
+                            .setTopics(topics);
         }
 
         public static Builder allTopicPartitions(String groupId) {
-            return new Builder(groupId, ALL_TOPIC_PARTITIONS);
+            return new Builder(groupId, null);
         }
 
         public boolean isAllTopicPartitions() {
-            return this.partitions == ALL_TOPIC_PARTITIONS;
+            return this.data.topics() == ALL_TOPIC_PARTITIONS;
         }
 
         @Override
@@ -118,54 +77,40 @@ public class OffsetFetchRequest extends AbstractRequest {
             if (isAllTopicPartitions() && version < 2)
                 throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
                         "v" + version + ", but we need v2 or newer to request all topic partitions.");
-            return new OffsetFetchRequest(groupId, partitions, version);
+            return new OffsetFetchRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            String partitionsString = partitions == null ? "<ALL>" : Utils.join(partitions, ",");
-            bld.append("(type=OffsetFetchRequest, ").
-                    append("groupId=").append(groupId).
-                    append(", partitions=").append(partitionsString).
-                    append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private final String groupId;
-    private final List<TopicPartition> partitions;
+    public List<TopicPartition> partitions() {
+        if (isAllPartitions()) {
+            return null;
+        }
+        List<TopicPartition> partitions = new ArrayList<>();
+        for (OffsetFetchRequestTopic topic : data.topics()) {
+            for (Integer partitionIndex : topic.partitionIndexes()) {
+                partitions.add(new TopicPartition(topic.name(), partitionIndex));
+            }
+        }
+        return partitions;
+    }
 
-    public static OffsetFetchRequest forAllPartitions(String groupId) {
-        return new OffsetFetchRequest.Builder(groupId, null).build((short) 2);
+    public String groupId() {
+        return data.groupId();
     }
 
-    private OffsetFetchRequest(String groupId, List<TopicPartition> partitions, short version) {
+    private OffsetFetchRequest(OffsetFetchRequestData data, short version) {
         super(ApiKeys.OFFSET_FETCH, version);
-        this.groupId = groupId;
-        this.partitions = partitions;
+        this.data = data;
     }
 
     public OffsetFetchRequest(Struct struct, short version) {
         super(ApiKeys.OFFSET_FETCH, version);
-
-        Object[] topicArray = struct.get(TOPICS);
-        if (topicArray != null) {
-            partitions = new ArrayList<>();
-            for (Object topicResponseObj : topicArray) {
-                Struct topicResponse = (Struct) topicResponseObj;
-                String topic = topicResponse.get(TOPIC_NAME);
-                for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                    Struct partitionResponse = (Struct) partitionResponseObj;
-                    int partition = partitionResponse.get(PARTITION_ID);
-                    partitions.add(new TopicPartition(topic, partition));
-                }
-            }
-        } else
-            partitions = null;
-
-
-        groupId = struct.get(GROUP_ID);
+        this.data = new OffsetFetchRequestData(struct, version);
     }
 
     public OffsetFetchResponse getErrorResponse(Errors error) {
@@ -183,8 +128,12 @@ public class OffsetFetchRequest extends AbstractRequest {
                     OffsetFetchResponse.NO_METADATA,
                     error);
 
-            for (TopicPartition partition : this.partitions)
-                responsePartitions.put(partition, partitionError);
+            for (OffsetFetchRequestTopic topic : this.data.topics()) {
+                for (int partitionIndex : topic.partitionIndexes()) {
+                    responsePartitions.put(
+                        new TopicPartition(topic.name(), partitionIndex), partitionError);
+                }
+            }
         }
 
         switch (versionId) {
@@ -207,47 +156,16 @@ public class OffsetFetchRequest extends AbstractRequest {
         return getErrorResponse(throttleTimeMs, Errors.forException(e));
     }
 
-    public String groupId() {
-        return groupId;
-    }
-
-    public List<TopicPartition> partitions() {
-        return partitions;
-    }
-
     public static OffsetFetchRequest parse(ByteBuffer buffer, short version) {
         return new OffsetFetchRequest(ApiKeys.OFFSET_FETCH.parseRequest(version, buffer), version);
     }
 
     public boolean isAllPartitions() {
-        return partitions == null;
+        return data.topics() == ALL_TOPIC_PARTITIONS;
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.OFFSET_FETCH.requestSchema(version()));
-        struct.set(GROUP_ID, groupId);
-        if (partitions != null) {
-            Map<String, List<Integer>> topicsData = CollectionUtils.groupPartitionsByTopic(partitions);
-
-            List<Struct> topicArray = new ArrayList<>();
-            for (Map.Entry<String, List<Integer>> entries : topicsData.entrySet()) {
-                Struct topicData = struct.instance(TOPICS);
-                topicData.set(TOPIC_NAME, entries.getKey());
-                List<Struct> partitionArray = new ArrayList<>();
-                for (Integer partitionId : entries.getValue()) {
-                    Struct partitionData = topicData.instance(PARTITIONS);
-                    partitionData.set(PARTITION_ID, partitionId);
-                    partitionArray.add(partitionData);
-                }
-                topicData.set(PARTITIONS, partitionArray.toArray());
-                topicArray.add(topicData);
-            }
-            struct.set(TOPICS, topicArray.toArray());
-        } else
-            struct.set(TOPICS, null);
-
-        return struct;
+        return data.toStruct(version());
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index ddf5c3b..722ef4e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -17,29 +17,23 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_LEADER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_METADATA;
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_OFFSET;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
 
 /**
  * Possible error codes:
@@ -55,77 +49,21 @@ import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
  *   - {@link Errors#GROUP_AUTHORIZATION_FAILED}
  */
 public class OffsetFetchResponse extends AbstractResponse {
-    private static final Field.ComplexArray TOPICS = new Field.ComplexArray("responses",
-            "Responses by topic for fetched offsets");
-
-    // topic level fields
-    private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partition_responses",
-            "Responses by partition for fetched offsets");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            COMMITTED_OFFSET,
-            COMMITTED_METADATA,
-            ERROR_CODE);
-
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-
-    private static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(
-            TOPICS_V0);
-
-    // V1 begins support for fetching offsets from the internal __consumer_offsets topic
-    private static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
-
-    // V2 adds top-level error code
-    private static final Schema OFFSET_FETCH_RESPONSE_V2 = new Schema(
-            TOPICS_V0,
-            ERROR_CODE);
-
-    // V3 request includes throttle time
-    private static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema(
-            THROTTLE_TIME_MS,
-            TOPICS_V0,
-            ERROR_CODE);
-
-    // V4 bump used to indicate that on quota violation brokers send out responses before throttling.
-    private static final Schema OFFSET_FETCH_RESPONSE_V4 = OFFSET_FETCH_RESPONSE_V3;
-
-    // V5 adds the leader epoch to the committed offset
-    private static final Field PARTITIONS_V5 = PARTITIONS.withFields(
-            PARTITION_ID,
-            COMMITTED_OFFSET,
-            COMMITTED_LEADER_EPOCH,
-            COMMITTED_METADATA,
-            ERROR_CODE);
-
-    private static final Field TOPICS_V5 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V5);
-
-    private static final Schema OFFSET_FETCH_RESPONSE_V5 = new Schema(
-            THROTTLE_TIME_MS,
-            TOPICS_V5,
-            ERROR_CODE);
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2,
-            OFFSET_FETCH_RESPONSE_V3, OFFSET_FETCH_RESPONSE_V4, OFFSET_FETCH_RESPONSE_V5};
-    }
-
     public static final long INVALID_OFFSET = -1L;
     public static final String NO_METADATA = "";
     public static final PartitionData UNKNOWN_PARTITION = new PartitionData(INVALID_OFFSET,
-            Optional.empty(), NO_METADATA, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+                                                                            Optional.empty(),
+                                                                            NO_METADATA,
+                                                                            Errors.UNKNOWN_TOPIC_OR_PARTITION);
     public static final PartitionData UNAUTHORIZED_PARTITION = new PartitionData(INVALID_OFFSET,
-            Optional.empty(), NO_METADATA, Errors.TOPIC_AUTHORIZATION_FAILED);
+                                                                                 Optional.empty(),
+                                                                                 NO_METADATA,
+                                                                                 Errors.TOPIC_AUTHORIZATION_FAILED);
+    private static final List<Errors> PARTITION_ERRORS = Arrays.asList(
+        Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.TOPIC_AUTHORIZATION_FAILED);
 
-    private static final List<Errors> PARTITION_ERRORS = Collections.singletonList(Errors.UNKNOWN_TOPIC_OR_PARTITION);
-
-    private final Map<TopicPartition, PartitionData> responseData;
+    public final OffsetFetchResponseData data;
     private final Errors error;
-    private final int throttleTimeMs;
 
     public static final class PartitionData {
         public final long offset;
@@ -146,6 +84,32 @@ public class OffsetFetchResponse extends AbstractResponse {
         public boolean hasError() {
             return this.error != Errors.NONE;
         }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof PartitionData))
+                return false;
+            PartitionData otherPartition = (PartitionData) other;
+            return this.offset == otherPartition.offset
+                       && this.leaderEpoch.equals(otherPartition.leaderEpoch)
+                       && this.metadata.equals(otherPartition.metadata)
+                       && this.error.equals(otherPartition.error);
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionData("
+                       + "offset=" + offset
+                       + ", leaderEpoch=" + leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH)
+                       + ", metadata=" + metadata
+                       + ", error='" + error.toString()
+                       + ")";
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(offset, leaderEpoch, metadata, error);
+        }
     }
 
     /**
@@ -164,31 +128,41 @@ public class OffsetFetchResponse extends AbstractResponse {
      * @param responseData Fetched offset information grouped by topic-partition
      */
     public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition, PartitionData> responseData) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.responseData = responseData;
+        Map<String, OffsetFetchResponseTopic> offsetFetchResponseTopicMap = new HashMap<>();
+        for (Map.Entry<TopicPartition, PartitionData> entry : responseData.entrySet()) {
+            String topicName = entry.getKey().topic();
+            OffsetFetchResponseTopic topic = offsetFetchResponseTopicMap.getOrDefault(
+                topicName, new OffsetFetchResponseTopic().setName(topicName));
+            PartitionData partitionData = entry.getValue();
+            topic.partitions().add(new OffsetFetchResponsePartition()
+                                       .setPartitionIndex(entry.getKey().partition())
+                                       .setErrorCode(partitionData.error.code())
+                                       .setCommittedOffset(partitionData.offset)
+                                       .setCommittedLeaderEpoch(
+                                           partitionData.leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH))
+                                       .setMetadata(partitionData.metadata)
+            );
+            offsetFetchResponseTopicMap.put(topicName, topic);
+        }
+
+        this.data = new OffsetFetchResponseData()
+            .setTopics(new ArrayList<>(offsetFetchResponseTopicMap.values()))
+            .setErrorCode(error.code())
+            .setThrottleTimeMs(throttleTimeMs);
         this.error = error;
     }
 
-    public OffsetFetchResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        Errors topLevelError = Errors.NONE;
-        this.responseData = new HashMap<>();
-        for (Object topicResponseObj : struct.get(TOPICS)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                long offset = partitionResponse.get(COMMITTED_OFFSET);
-                String metadata = partitionResponse.get(COMMITTED_METADATA);
-                Optional<Integer> leaderEpochOpt = RequestUtils.getLeaderEpoch(partitionResponse, COMMITTED_LEADER_EPOCH);
-
-                Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE));
-                if (error != Errors.NONE && !PARTITION_ERRORS.contains(error))
-                    topLevelError = error;
+    public OffsetFetchResponse(Struct struct, short version) {
+        this.data = new OffsetFetchResponseData(struct, version);
 
-                PartitionData partitionData = new PartitionData(offset, leaderEpochOpt, metadata, error);
-                this.responseData.put(new TopicPartition(topic, partition), partitionData);
+        Errors topLevelError = Errors.NONE;
+        for (OffsetFetchResponseTopic topic : data.topics()) {
+            for (OffsetFetchResponsePartition partition : topic.partitions()) {
+                Errors partitionError = Errors.forCode(partition.errorCode());
+                if (partitionError != Errors.NONE && !PARTITION_ERRORS.contains(partitionError)) {
+                    topLevelError = partitionError;
+                    break;
+                }
             }
         }
 
@@ -196,28 +170,20 @@ public class OffsetFetchResponse extends AbstractResponse {
         // for older versions there is no top-level error in the response and all errors are partition errors,
         // so if there is a group or coordinator error at the partition level use that as the top-level error.
         // this way clients can depend on the top-level error regardless of the offset fetch version.
-        this.error = struct.hasField(ERROR_CODE) ? Errors.forCode(struct.get(ERROR_CODE)) : topLevelError;
-    }
-
-    public void maybeThrowFirstPartitionError() {
-        Collection<PartitionData> partitionsData = this.responseData.values();
-        for (PartitionData data : partitionsData) {
-            if (data.hasError())
-                throw data.error.exception();
-        }
+        this.error = version >= 2 ? Errors.forCode(data.errorCode()) : topLevelError;
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
     public boolean hasError() {
-        return this.error != Errors.NONE;
+        return error != Errors.NONE;
     }
 
     public Errors error() {
-        return this.error;
+        return error;
     }
 
     @Override
@@ -226,43 +192,27 @@ public class OffsetFetchResponse extends AbstractResponse {
     }
 
     public Map<TopicPartition, PartitionData> responseData() {
+        Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+        for (OffsetFetchResponseTopic topic : data.topics()) {
+            for (OffsetFetchResponsePartition partition : topic.partitions()) {
+                responseData.put(new TopicPartition(topic.name(), partition.partitionIndex()),
+                                 new PartitionData(partition.committedOffset(),
+                                                   RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()),
+                                                   partition.metadata(),
+                                                   Errors.forCode(partition.errorCode()))
+                );
+            }
+        }
         return responseData;
     }
 
     public static OffsetFetchResponse parse(ByteBuffer buffer, short version) {
-        return new OffsetFetchResponse(ApiKeys.OFFSET_FETCH.parseResponse(version, buffer));
+        return new OffsetFetchResponse(ApiKeys.OFFSET_FETCH.parseResponse(version, buffer), version);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.OFFSET_FETCH.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupPartitionDataByTopic(responseData);
-        List<Struct> topicArray = new ArrayList<>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS);
-            topicData.set(TOPIC_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS);
-                partitionData.set(PARTITION_ID, partitionEntry.getKey());
-                partitionData.set(COMMITTED_OFFSET, fetchPartitionData.offset);
-                RequestUtils.setLeaderEpochIfExists(partitionData, COMMITTED_LEADER_EPOCH, fetchPartitionData.leaderEpoch);
-                partitionData.set(COMMITTED_METADATA, fetchPartitionData.metadata);
-                partitionData.set(ERROR_CODE, fetchPartitionData.error.code());
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS, topicArray.toArray());
-
-        if (version > 1)
-            struct.set(ERROR_CODE, this.error.code());
-
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index 4ff781b..0449fda 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -17,10 +17,13 @@
   "apiKey": 9,
   "type": "request",
   "name": "OffsetFetchRequest",
+  // In version 0, the request read offsets from ZK.
+  //
   // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
   //
   // Starting in version 2, the request can contain a null topics array to indicate that offsets
-  // for all topics should be fetched.
+  // for all topics should be fetched. It also returns a top level error code
+  // for group or coordinator level errors.
   //
   // Version 3, 4, and 5 are the same as version 2.
   "validVersions": "0-5",
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index eb0bbbc..69756cc 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -40,7 +40,7 @@
           "about": "The partition index." },
         { "name": "CommittedOffset", "type": "int64", "versions": "0+",
           "about": "The committed message offset." },
-        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+",
+        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1",
           "about": "The leader epoch." },
         { "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+",
           "about": "The partition metadata." },
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 769f58c..7ec7c24 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1377,12 +1377,12 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            //Retriable FindCoordinatorResponse errors should be retried
+            // Retriable FindCoordinatorResponse errors should be retried
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
 
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
-            //Retriable  errors should be retried
+            // Retriable errors should be retried
             env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
             env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
 
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index bdfce3f..473912c 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitio
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
+import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
@@ -29,6 +32,7 @@ import org.apache.kafka.common.protocol.Message;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.BoundField;
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.utils.Utils;
@@ -46,6 +50,7 @@ import java.util.function.Supplier;
 
 import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -266,6 +271,67 @@ public final class MessageTest {
         testAllMessageRoundTripsFromVersion((short) 3, new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateWithAddingRemovingReplicas)));
     }
 
+    @Test
+    public void testOffsetFetchVersions() throws Exception {
+        String groupId = "groupId";
+        String topicName = "topic";
+
+        testAllMessageRoundTrips(new OffsetFetchRequestData()
+                                     .setTopics(new ArrayList<>())
+                                     .setGroupId(groupId));
+
+        testAllMessageRoundTrips(new OffsetFetchRequestData()
+                                     .setGroupId(groupId)
+                                     .setTopics(Collections.singletonList(
+                                         new OffsetFetchRequestTopic()
+                                             .setName(topicName)
+                                             .setPartitionIndexes(Collections.singletonList(5))))
+        );
+
+        OffsetFetchRequestData allPartitionData = new OffsetFetchRequestData()
+                                                      .setGroupId(groupId)
+                                                      .setTopics(null);
+        for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) {
+            if (version < 2) {
+                final short finalVersion = version;
+                assertThrows(SchemaException.class, () -> testAllMessageRoundTripsFromVersion(finalVersion, allPartitionData));
+            } else {
+                testAllMessageRoundTripsFromVersion(version, allPartitionData);
+            }
+        }
+
+        Supplier<OffsetFetchResponseData> response =
+            () -> new OffsetFetchResponseData()
+                      .setTopics(Collections.singletonList(
+                          new OffsetFetchResponseTopic()
+                              .setName(topicName)
+                              .setPartitions(Collections.singletonList(
+                                  new OffsetFetchResponsePartition()
+                                      .setPartitionIndex(5)
+                                      .setMetadata(null)
+                                      .setCommittedOffset(100)
+                                      .setCommittedLeaderEpoch(3)
+                                      .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))))
+                      .setErrorCode(Errors.NOT_COORDINATOR.code())
+                      .setThrottleTimeMs(10);
+        for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) {
+            OffsetFetchResponseData responseData = response.get();
+            if (version <= 1) {
+                responseData.setErrorCode(Errors.NONE.code());
+            }
+
+            if (version <= 2) {
+                responseData.setThrottleTimeMs(0);
+            }
+
+            if (version <= 4) {
+                responseData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
+            }
+
+            testAllMessageRoundTripsFromVersion(version, responseData);
+        }
+    }
+
     private void testAllMessageRoundTrips(Message message) throws Exception {
         testAllMessageRoundTripsFromVersion(message.lowestSupportedVersion(), message);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
new file mode 100644
index 0000000..3bafb76
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class OffsetFetchRequestTest {
+
+    private final String topicOne = "topic1";
+    private final int partitionOne = 1;
+    private final String topicTwo = "topic2";
+    private final int partitionTwo = 2;
+    private final String groupId = "groupId";
+
+    private OffsetFetchRequest.Builder builder;
+    private List<TopicPartition> partitions;
+
+    @Before
+    public void setUp() {
+        partitions = Arrays.asList(new TopicPartition(topicOne, partitionOne),
+                                   new TopicPartition(topicTwo, partitionTwo));
+        builder = new OffsetFetchRequest.Builder(
+            groupId,
+            partitions
+        );
+    }
+
+    @Test
+    public void testConstructor() {
+        assertFalse(builder.isAllTopicPartitions());
+        int throttleTimeMs = 10;
+
+        Map<TopicPartition, PartitionData> expectedData = new HashMap<>();
+        for (TopicPartition partition : partitions) {
+            expectedData.put(partition, new PartitionData(
+                OffsetFetchResponse.INVALID_OFFSET,
+                Optional.empty(),
+                OffsetFetchResponse.NO_METADATA,
+                Errors.NONE
+            ));
+        }
+
+        for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) {
+            OffsetFetchRequest request = builder.build(version);
+            assertFalse(request.isAllPartitions());
+            assertEquals(groupId, request.groupId());
+            assertEquals(partitions, request.partitions());
+
+            OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE);
+            assertEquals(Errors.NONE, response.error());
+            assertFalse(response.hasError());
+            assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts());
+
+            if (version <= 1) {
+                assertEquals(expectedData, response.responseData());
+            }
+
+            if (version >= 3) {
+                assertEquals(throttleTimeMs, response.throttleTimeMs());
+            } else {
+                assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
+            }
+        }
+    }
+
+    @Test
+    public void testConstructorFailForUnsupportedAllPartition() {
+        builder = OffsetFetchRequest.Builder.allTopicPartitions(groupId);
+        for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) {
+            short finalVersion = version;
+            if (version <= 1) {
+                assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion));
+            } else {
+                OffsetFetchRequest request = builder.build(finalVersion);
+                assertEquals(groupId, request.groupId());
+                assertNull(request.partitions());
+                assertTrue(request.isAllPartitions());
+            }
+        }
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
new file mode 100644
index 0000000..d3ff161
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OffsetFetchResponseTest {
+
+    private final int throttleTimeMs = 10;
+    private final int offset = 100;
+    private final String metadata = "metadata";
+
+    private final String topicOne = "topic1";
+    private final int partitionOne = 1;
+    private final Optional<Integer> leaderEpochOne = Optional.of(1);
+    private final String topicTwo = "topic2";
+    private final int partitionTwo = 2;
+    private final Optional<Integer> leaderEpochTwo = Optional.of(2);
+
+    private Map<TopicPartition, PartitionData> partitionDataMap;
+
+    @Before
+    public void setUp() {
+        partitionDataMap = new HashMap<>();
+        partitionDataMap.put(new TopicPartition(topicOne, partitionOne), new PartitionData(
+            offset,
+            leaderEpochOne,
+            metadata,
+            Errors.TOPIC_AUTHORIZATION_FAILED
+        ));
+        partitionDataMap.put(new TopicPartition(topicTwo, partitionTwo), new PartitionData(
+            offset,
+            leaderEpochTwo,
+            metadata,
+            Errors.UNKNOWN_TOPIC_OR_PARTITION
+        ));
+    }
+
+    @Test
+    public void testConstructor() {
+        OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, Errors.NOT_COORDINATOR, partitionDataMap);
+        assertEquals(Errors.NOT_COORDINATOR, response.error());
+        assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 1), response.errorCounts());
+
+        assertEquals(throttleTimeMs, response.throttleTimeMs());
+
+        Map<TopicPartition, PartitionData> responseData = response.responseData();
+        assertEquals(partitionDataMap, responseData);
+        responseData.forEach(
+            (tp, data) -> assertTrue(data.hasError())
+        );
+    }
+
+    /**
+     * Test behavior changes over the versions. Refer to resources.common.messages.OffsetFetchResponse.json
+     */
+    @Test
+    public void testStructBuild() {
+        partitionDataMap.put(new TopicPartition(topicTwo, partitionTwo), new PartitionData(
+            offset,
+            leaderEpochTwo,
+            metadata,
+            Errors.GROUP_AUTHORIZATION_FAILED
+        ));
+
+        OffsetFetchResponse latestResponse = new OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
+
+        for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) {
+            Struct struct = latestResponse.data.toStruct(version);
+
+            OffsetFetchResponse oldResponse =  new OffsetFetchResponse(struct, version);
+
+            if (version <= 1) {
+                assertFalse(struct.hasField(ERROR_CODE));
+
+                // Partition level error populated in older versions.
+                assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, oldResponse.error());
+                assertEquals(Collections.singletonMap(Errors.GROUP_AUTHORIZATION_FAILED, 1), oldResponse.errorCounts());
+
+            } else {
+                assertTrue(struct.hasField(ERROR_CODE));
+
+                assertEquals(Errors.NONE, oldResponse.error());
+                assertEquals(Collections.singletonMap(Errors.NONE, 1), oldResponse.errorCounts());
+            }
+
+            if (version <= 2) {
+                assertEquals(DEFAULT_THROTTLE_TIME, oldResponse.throttleTimeMs());
+            } else {
+                assertEquals(throttleTimeMs, oldResponse.throttleTimeMs());
+            }
+
+            Map<TopicPartition, PartitionData> expectedDataMap = new HashMap<>();
+            for (Map.Entry<TopicPartition, PartitionData> entry : partitionDataMap.entrySet()) {
+                PartitionData partitionData = entry.getValue();
+                expectedDataMap.put(entry.getKey(), new PartitionData(
+                    partitionData.offset,
+                    version <= 4 ? Optional.empty() : partitionData.leaderEpoch,
+                    partitionData.metadata,
+                    partitionData.error
+                ));
+            }
+
+            Map<TopicPartition, PartitionData> responseData = oldResponse.responseData();
+            assertEquals(expectedDataMap, responseData);
+
+            responseData.forEach(
+                (tp, data) -> assertTrue(data.hasError())
+            );
+        }
+    }
+
+    @Test
+    public void testShouldThrottle() {
+        OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
+        for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) {
+            if (version >= 4) {
+                assertTrue(response.shouldClientThrottle(version));
+            } else {
+                assertFalse(response.shouldClientThrottle(version));
+            }
+        }
+    }
+
+    @Test
+    public void testNullableMetadata() {
+        partitionDataMap.clear();
+        partitionDataMap.put(new TopicPartition(topicOne, partitionOne),
+                             new PartitionData(
+                                 offset,
+                                 leaderEpochOne,
+                                 null,
+                                 Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        );
+
+        OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED, partitionDataMap);
+        OffsetFetchResponseData expectedData =
+            new OffsetFetchResponseData()
+                .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
+                .setThrottleTimeMs(throttleTimeMs)
+                .setTopics(Collections.singletonList(
+                    new OffsetFetchResponseTopic()
+                        .setName(topicOne)
+                        .setPartitions(Collections.singletonList(
+                            new OffsetFetchResponsePartition()
+                                .setPartitionIndex(partitionOne)
+                                .setCommittedOffset(offset)
+                                .setCommittedLeaderEpoch(leaderEpochOne.orElse(-1))
+                                .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                                .setMetadata(null))
+                        ))
+                );
+        assertEquals(expectedData, response.data);
+    }
+
+    @Test
+    public void testUseDefaultLeaderEpoch() {
+        final Optional<Integer> emptyLeaderEpoch = Optional.empty();
+        partitionDataMap.clear();
+
+        partitionDataMap.put(new TopicPartition(topicOne, partitionOne),
+                             new PartitionData(
+                                 offset,
+                                 emptyLeaderEpoch,
+                                 metadata,
+                                 Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        );
+
+        OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, Errors.NOT_COORDINATOR, partitionDataMap);
+        OffsetFetchResponseData expectedData =
+            new OffsetFetchResponseData()
+                .setErrorCode(Errors.NOT_COORDINATOR.code())
+                .setThrottleTimeMs(throttleTimeMs)
+                .setTopics(Collections.singletonList(
+                new OffsetFetchResponseTopic()
+                    .setName(topicOne)
+                    .setPartitions(Collections.singletonList(
+                        new OffsetFetchResponsePartition()
+                            .setPartitionIndex(partitionOne)
+                            .setCommittedOffset(offset)
+                            .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                            .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            .setMetadata(metadata))
+                    ))
+                );
+        assertEquals(expectedData, response.data);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 70adcca..4218eff 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -196,12 +196,12 @@ public class RequestResponseTest {
         checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), new UnknownServerException(), true);
         checkResponse(createMetadataResponse(), 4, true);
         checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), new UnknownServerException(), true);
-        checkRequest(OffsetFetchRequest.forAllPartitions("group1"), true);
-        checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorException("Not Coordinator"), true);
+        checkRequest(OffsetFetchRequest.Builder.allTopicPartitions("group1").build(), true);
+        checkErrorResponse(OffsetFetchRequest.Builder.allTopicPartitions("group1").build(), new NotCoordinatorException("Not Coordinator"), true);
         checkRequest(createOffsetFetchRequest(0), true);
         checkRequest(createOffsetFetchRequest(1), true);
         checkRequest(createOffsetFetchRequest(2), true);
-        checkRequest(OffsetFetchRequest.forAllPartitions("group1"), true);
+        checkRequest(OffsetFetchRequest.Builder.allTopicPartitions("group1").build(), true);
         checkErrorResponse(createOffsetFetchRequest(0), new UnknownServerException(), true);
         checkErrorResponse(createOffsetFetchRequest(1), new UnknownServerException(), true);
         checkErrorResponse(createOffsetFetchRequest(2), new UnknownServerException(), true);
@@ -732,8 +732,10 @@ public class RequestResponseTest {
     @Test
     public void testOffsetFetchRequestBuilderToString() {
         String allTopicPartitionsString = OffsetFetchRequest.Builder.allTopicPartitions("someGroup").toString();
-        assertTrue(allTopicPartitionsString.contains("<ALL>"));
-        String string = new OffsetFetchRequest.Builder("group1", Collections.singletonList(new TopicPartition("test11", 1))).toString();
+
+        assertTrue(allTopicPartitionsString.contains("groupId='someGroup', topics=null"));
+        String string = new OffsetFetchRequest.Builder("group1",
+            Collections.singletonList(new TopicPartition("test11", 1))).toString();
         assertTrue(string.contains("test11"));
         assertTrue(string.contains("group1"));
     }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ef2e6fd..8f3f24f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1132,7 +1132,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     // note there's only one broker, so no need to lookup the group coordinator
 
     // without describe permission on the topic, we shouldn't be able to fetch offsets
-    val offsetFetchRequest = requests.OffsetFetchRequest.forAllPartitions(group)
+    val offsetFetchRequest = requests.OffsetFetchRequest.Builder.allTopicPartitions(group).build()
     var offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, anySocketServer)
     assertEquals(Errors.NONE, offsetFetchResponse.error)
     assertTrue(offsetFetchResponse.responseData.isEmpty)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index b09fc02..a8d29fd 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -542,7 +542,7 @@ class RequestQuotaTest extends BaseRequestTest {
         new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
       case ApiKeys.OFFSET_COMMIT =>
         new OffsetCommitResponse(response, ApiKeys.OFFSET_COMMIT.latestVersion).throttleTimeMs
-      case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
+      case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response, ApiKeys.OFFSET_FETCH.latestVersion).throttleTimeMs
       case ApiKeys.FIND_COORDINATOR =>
         new FindCoordinatorResponse(response, ApiKeys.FIND_COORDINATOR.latestVersion).throttleTimeMs
       case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs


Mime
View raw message