kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [3/8] kafka git commit: MINOR: Move request/response schemas to the corresponding object representation
Date Tue, 19 Sep 2017 04:13:24 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 8fe8ba7..732fb49 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.Utils;
@@ -29,6 +32,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
 public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
     public static final long UNKNOWN_OFFSET = -1L;
@@ -36,13 +45,8 @@ public class ListOffsetResponse extends AbstractResponse {
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partition_responses";
 
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
     /**
      * Possible error code:
      *
@@ -58,6 +62,38 @@ public class ListOffsetResponse extends AbstractResponse {
     private static final String TIMESTAMP_KEY_NAME = "timestamp";
     private static final String OFFSET_KEY_NAME = "offset";
 
+    private static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE,
+            new Field(OFFSETS_KEY_NAME, new ArrayOf(INT64), "A list of offsets."));
+
+    private static final Schema LIST_OFFSET_RESPONSE_PARTITION_V1 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE,
+            new Field(TIMESTAMP_KEY_NAME, INT64, "The timestamp associated with the returned offset"),
+            new Field(OFFSET_KEY_NAME, INT64, "offset found"));
+
+    private static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
+
+    private static final Schema LIST_OFFSET_RESPONSE_TOPIC_V1 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V1)));
+
+    private static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
+
+    private static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
+    private static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
+    }
+
     public static final class PartitionData {
         public final Errors error;
         // The offsets list is only used in ListOffsetResponse v0.
@@ -121,19 +157,19 @@ public class ListOffsetResponse extends AbstractResponse {
     }
 
     public ListOffsetResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
         responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            String topic = topicResponse.get(TOPIC_NAME);
             for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                Errors error = Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME));
+                int partition = partitionResponse.get(PARTITION_ID);
+                Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE));
                 PartitionData partitionData;
                 if (partitionResponse.hasField(OFFSETS_KEY_NAME)) {
                     Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME);
-                    List<Long> offsetsList = new ArrayList<Long>();
+                    List<Long> offsetsList = new ArrayList<>();
                     for (Object offset : offsets)
                         offsetsList.add((Long) offset);
                     partitionData = new PartitionData(error, offsetsList);
@@ -162,20 +198,19 @@ public class ListOffsetResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LIST_OFFSETS.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            topicData.set(TOPIC_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
                 PartitionData offsetPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.error.code());
+                partitionData.set(PARTITION_ID, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE, offsetPartitionData.error.code());
                 if (version == 0)
                     partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray());
                 else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 8aa2fc3..934b0ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -20,6 +20,9 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -28,11 +31,42 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class MetadataRequest extends AbstractRequest {
 
     private static final String TOPICS_KEY_NAME = "topics";
     private static final String ALLOW_AUTO_TOPIC_CREATION_KEY_NAME = "allow_auto_topic_creation";
 
+    private static final Schema METADATA_REQUEST_V0 = new Schema(
+            new Field(TOPICS_KEY_NAME, new ArrayOf(STRING), "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics."));
+
+    private static final Schema METADATA_REQUEST_V1 = new Schema(
+            new Field(TOPICS_KEY_NAME, ArrayOf.nullable(STRING), "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."));
+
+    /* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */
+    private static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1;
+
+    /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
+    private static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
+
+    /* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */
+    private static final Schema METADATA_REQUEST_V4 = new Schema(
+            new Field(TOPICS_KEY_NAME, ArrayOf.nullable(STRING), "An array of topics to fetch metadata for. " +
+                    "If the topics array is null fetch metadata for all topics."),
+            new Field(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME, BOOLEAN, "If this and the broker config " +
+                    "'auto.create.topics.enable' are true, topics that don't exist will be created by the broker. " +
+                    "Otherwise, no topics will be created by the broker."));
+
+    /* The v5 metadata request is the same as v4. An additional field for offline_replicas has been added to the v5 metadata response */
+    private static final Schema METADATA_REQUEST_V5 = METADATA_REQUEST_V4;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3,
+            METADATA_REQUEST_V4, METADATA_REQUEST_V5};
+    }
+
     public static class Builder extends AbstractRequest.Builder<MetadataRequest> {
         private static final List<String> ALL_TOPICS = null;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 10f5c13..fb69cef 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -34,8 +37,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class MetadataResponse extends AbstractResponse {
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
 
+public class MetadataResponse extends AbstractResponse {
     private static final String BROKERS_KEY_NAME = "brokers";
     private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
 
@@ -50,9 +61,6 @@ public class MetadataResponse extends AbstractResponse {
 
     private static final String CLUSTER_ID_KEY_NAME = "cluster_id";
 
-    // topic level field names
-    private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
-
     /**
      * Possible error codes:
      *
@@ -62,13 +70,9 @@ public class MetadataResponse extends AbstractResponse {
      * TopicAuthorizationFailed (29)
      */
 
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String IS_INTERNAL_KEY_NAME = "is_internal";
     private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
 
-    // partition level field names
-    private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
-
     /**
      * Possible error codes:
      *
@@ -76,12 +80,95 @@ public class MetadataResponse extends AbstractResponse {
      * ReplicaNotAvailable (9)
      */
 
-    private static final String PARTITION_KEY_NAME = "partition_id";
     private static final String LEADER_KEY_NAME = "leader";
     private static final String REPLICAS_KEY_NAME = "replicas";
     private static final String ISR_KEY_NAME = "isr";
     private static final String OFFLINE_REPLICAS_KEY_NAME = "offline_replicas";
 
+    private static final Schema METADATA_BROKER_V0 = new Schema(
+            new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
+            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
+            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));
+
+    private static final Schema PARTITION_METADATA_V0 = new Schema(
+            ERROR_CODE,
+            PARTITION_ID,
+            new Field(LEADER_KEY_NAME, INT32, "The id of the broker acting as leader for this partition."),
+            new Field(REPLICAS_KEY_NAME, new ArrayOf(INT32), "The set of all nodes that host this partition."),
+            new Field(ISR_KEY_NAME, new ArrayOf(INT32), "The set of nodes that are in sync with the leader for this partition."));
+
+    private static final Schema TOPIC_METADATA_V0 = new Schema(
+            ERROR_CODE,
+            TOPIC_NAME,
+            new Field(PARTITION_METADATA_KEY_NAME, new ArrayOf(PARTITION_METADATA_V0), "Metadata for each partition of the topic."));
+
+    private static final Schema METADATA_RESPONSE_V0 = new Schema(
+            new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V0), "Host and port information for all brokers."),
+            new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V0)));
+
+    private static final Schema METADATA_BROKER_V1 = new Schema(
+            new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
+            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
+            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."),
+            new Field(RACK_KEY_NAME, NULLABLE_STRING, "The rack of the broker."));
+
+    private static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0;
+
+    // PARTITION_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    private static final Schema PARTITION_METADATA_V2 = new Schema(
+            ERROR_CODE,
+            PARTITION_ID,
+            new Field(LEADER_KEY_NAME, INT32, "The id of the broker acting as leader for this partition."),
+            new Field(REPLICAS_KEY_NAME, new ArrayOf(INT32), "The set of all nodes that host this partition."),
+            new Field(ISR_KEY_NAME, new ArrayOf(INT32), "The set of nodes that are in sync with the leader for this partition."),
+            new Field(OFFLINE_REPLICAS_KEY_NAME, new ArrayOf(INT32), "The set of offline replicas of this partition."));
+
+    private static final Schema TOPIC_METADATA_V1 = new Schema(
+            ERROR_CODE,
+            TOPIC_NAME,
+            new Field(IS_INTERNAL_KEY_NAME, BOOLEAN, "Indicates if the topic is considered a Kafka internal topic"),
+            new Field(PARTITION_METADATA_KEY_NAME, new ArrayOf(PARTITION_METADATA_V1), "Metadata for each partition of the topic."));
+
+    // TOPIC_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    private static final Schema TOPIC_METADATA_V2 = new Schema(
+            ERROR_CODE,
+            TOPIC_NAME,
+            new Field(IS_INTERNAL_KEY_NAME, BOOLEAN, "Indicates if the topic is considered a Kafka internal topic"),
+            new Field(PARTITION_METADATA_KEY_NAME, new ArrayOf(PARTITION_METADATA_V2), "Metadata for each partition of the topic."));
+
+    private static final Schema METADATA_RESPONSE_V1 = new Schema(
+            new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."),
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."),
+            new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1)));
+
+    private static final Schema METADATA_RESPONSE_V2 = new Schema(
+            new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."),
+            new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."),
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."),
+            new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1)));
+
+    private static final Schema METADATA_RESPONSE_V3 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."),
+            new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."),
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."),
+            new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1)));
+
+    private static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;
+
+    // METADATA_RESPONSE_V5 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    private static final Schema METADATA_RESPONSE_V5 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."),
+            new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."),
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."),
+            new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V2)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3,
+            METADATA_RESPONSE_V4, METADATA_RESPONSE_V5};
+    }
+
     private final int throttleTimeMs;
     private final Collection<Node> brokers;
     private final Node controller;
@@ -104,7 +191,7 @@ public class MetadataResponse extends AbstractResponse {
     }
 
     public MetadataResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
         Map<Integer, Node> brokers = new HashMap<>();
         Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
         for (Object brokerStruct : brokerStructs) {
@@ -135,8 +222,8 @@ public class MetadataResponse extends AbstractResponse {
         Object[] topicInfos = (Object[]) struct.get(TOPIC_METADATA_KEY_NAME);
         for (Object topicInfoObj : topicInfos) {
             Struct topicInfo = (Struct) topicInfoObj;
-            Errors topicError = Errors.forCode(topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME));
-            String topic = topicInfo.getString(TOPIC_KEY_NAME);
+            Errors topicError = Errors.forCode(topicInfo.get(ERROR_CODE));
+            String topic = topicInfo.get(TOPIC_NAME);
             // This field only exists in v1+
             // When we can't know if a topic is internal or not in a v0 response we default to false
             boolean isInternal = topicInfo.hasField(IS_INTERNAL_KEY_NAME) ? topicInfo.getBoolean(IS_INTERNAL_KEY_NAME) : false;
@@ -146,8 +233,8 @@ public class MetadataResponse extends AbstractResponse {
             Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
             for (Object partitionInfoObj : partitionInfos) {
                 Struct partitionInfo = (Struct) partitionInfoObj;
-                Errors partitionError = Errors.forCode(partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME));
-                int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
+                Errors partitionError = Errors.forCode(partitionInfo.get(ERROR_CODE));
+                int partition = partitionInfo.get(PARTITION_ID);
                 int leader = partitionInfo.getInt(LEADER_KEY_NAME);
                 Node leaderNode = leader == -1 ? null : brokers.get(leader);
 
@@ -400,8 +487,7 @@ public class MetadataResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.METADATA.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> brokerArray = new ArrayList<>();
         for (Node node : brokers) {
             Struct broker = struct.instance(BROKERS_KEY_NAME);
@@ -426,8 +512,8 @@ public class MetadataResponse extends AbstractResponse {
         List<Struct> topicMetadataArray = new ArrayList<>(topicMetadata.size());
         for (TopicMetadata metadata : topicMetadata) {
             Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, metadata.topic);
-            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code());
+            topicData.set(TOPIC_NAME, metadata.topic);
+            topicData.set(ERROR_CODE, metadata.error.code());
             // This field only exists in v1+
             if (topicData.hasField(IS_INTERNAL_KEY_NAME))
                 topicData.set(IS_INTERNAL_KEY_NAME, metadata.isInternal());
@@ -435,8 +521,8 @@ public class MetadataResponse extends AbstractResponse {
             List<Struct> partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size());
             for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
                 Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
-                partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, partitionMetadata.error.code());
-                partitionData.set(PARTITION_KEY_NAME, partitionMetadata.partition);
+                partitionData.set(ERROR_CODE, partitionMetadata.error.code());
+                partitionData.set(PARTITION_ID, partitionMetadata.partition);
                 partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id());
                 ArrayList<Integer> replicas = new ArrayList<>(partitionMetadata.replicas.size());
                 for (Node node : partitionMetadata.replicas)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 4402c4d..696d967 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
@@ -30,6 +32,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 /**
  * This wrapper supports both v0 and v1 of OffsetCommitRequest.
  */
@@ -41,17 +50,69 @@ public class OffsetCommitRequest extends AbstractRequest {
     private static final String RETENTION_TIME_KEY_NAME = "retention_time";
 
     // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String COMMIT_OFFSET_KEY_NAME = "offset";
     private static final String METADATA_KEY_NAME = "metadata";
 
     @Deprecated
     private static final String TIMESTAMP_KEY_NAME = "timestamp";         // for v0, v1
 
+    /* Offset commit api */
+    private static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(
+            PARTITION_ID,
+            new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Message offset to be committed."),
+            new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Any associated metadata the client wants to keep."));
+
+    private static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(
+            PARTITION_ID,
+            new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Message offset to be committed."),
+            new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp of the commit"),
+            new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Any associated metadata the client wants to keep."));
+
+    private static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(
+            PARTITION_ID,
+            new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Message offset to be committed."),
+            new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Any associated metadata the client wants to keep."));
+
+    private static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), "Partitions to commit offsets."));
+
+    private static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1), "Partitions to commit offsets."));
+
+    private static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2), "Partitions to commit offsets."));
+
+    private static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), "Topics to commit offsets."));
+
+    private static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the group."),
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by the group coordinator."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), "Topics to commit offsets."));
+
+    private static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the consumer group."),
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by the group coordinator."),
+            new Field(RETENTION_TIME_KEY_NAME, INT64, "Time period in ms to retain the offset."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets."));
+
+    /* v3 request is same as v2. Throttle time has been added to response */
+    private static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2,
+            OFFSET_COMMIT_REQUEST_V3};
+    }
+
     // default values for the current version
     public static final int DEFAULT_GENERATION_ID = -1;
     public static final String DEFAULT_MEMBER_ID = "";
@@ -190,10 +251,10 @@ public class OffsetCommitRequest extends AbstractRequest {
         offsetData = new HashMap<>();
         for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.getString(TOPIC_KEY_NAME);
+            String topic = topicData.get(TOPIC_NAME);
             for (Object partitionDataObj : topicData.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionDataStruct = (Struct) partitionDataObj;
-                int partition = partitionDataStruct.getInt(PARTITION_KEY_NAME);
+                int partition = partitionDataStruct.get(PARTITION_ID);
                 long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME);
                 String metadata = partitionDataStruct.getString(METADATA_KEY_NAME);
                 PartitionData partitionOffset;
@@ -219,12 +280,12 @@ public class OffsetCommitRequest extends AbstractRequest {
         List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            topicData.set(TOPIC_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(PARTITION_ID, partitionEntry.getKey());
                 partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
                 // Only for v1
                 if (partitionData.hasField(TIMESTAMP_KEY_NAME))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 782ffa5..0181eef 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -28,18 +31,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
 public class OffsetCommitResponse extends AbstractResponse {
 
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level fields
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partition_responses";
 
-    // partition level fields
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
     /**
      * Possible error codes:
      *
@@ -57,6 +60,32 @@ public class OffsetCommitResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
 
+    private static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE);
+
+    private static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
+
+    private static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+
+    /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
+    private static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
+    private static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
+
+    private static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2,
+            OFFSET_COMMIT_RESPONSE_V3};
+    }
+
+
     private final Map<TopicPartition, Errors> responseData;
     private final int throttleTimeMs;
 
@@ -70,15 +99,15 @@ public class OffsetCommitResponse extends AbstractResponse {
     }
 
     public OffsetCommitResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
         responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            String topic = topicResponse.get(TOPIC_NAME);
             for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                Errors error = Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME));
+                int partition = partitionResponse.get(PARTITION_ID);
+                Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE));
                 responseData.put(new TopicPartition(topic, partition), error);
             }
         }
@@ -87,19 +116,18 @@ public class OffsetCommitResponse extends AbstractResponse {
     @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
         Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupDataByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, Errors>> entries: topicsData.entrySet()) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            topicData.set(TOPIC_NAME, entries.getKey());
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, Errors> partitionEntry : entries.getValue().entrySet()) {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue().code());
+                partitionData.set(PARTITION_ID, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE, partitionEntry.getValue().code());
                 partitionArray.add(partitionData);
             }
             topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 15fdf57..6d8b959 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -20,6 +20,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.Utils;
@@ -30,16 +33,53 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class OffsetFetchRequest extends AbstractRequest {
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
+    /*
+     * Wire formats of version 0 and 1 are the same, but with different functionality.
+     * Wire format of version 2 is similar to version 1, with the exception of
+     * - accepting 'null' as list of topics
+     * - returning a top level error code
+     * Version 0 will read the offsets from ZK.
+     * Version 1 will read the offsets from Kafka.
+     * Version 2 will read the offsets from Kafka, and returns all associated topic partition offsets if
+     * a 'null' is passed instead of a list of specific topic partitions. It also returns a top level error code
+     * for group or coordinator level errors.
+     */
+    private static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(
+            PARTITION_ID);
+
+    private static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0), "Partitions to fetch offsets."));
+
+    private static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The consumer group id."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0), "Topics to fetch offsets."));
+
+    private static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
+
+    private static final Schema OFFSET_FETCH_REQUEST_V2 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The consumer group id."),
+            new Field(TOPICS_KEY_NAME, ArrayOf.nullable(OFFSET_FETCH_REQUEST_TOPIC_V0), "Topics to fetch offsets. If the " +
+                    "topic array is null fetch offsets for all topics."));
+
+    /* v3 request is the same as v2. Throttle time has been added to v3 response */
+    private static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2,
+            OFFSET_FETCH_REQUEST_V3};
+    }
 
     public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> {
         private static final List<TopicPartition> ALL_TOPIC_PARTITIONS = null;
@@ -102,10 +142,10 @@ public class OffsetFetchRequest extends AbstractRequest {
             partitions = new ArrayList<>();
             for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
                 Struct topicResponse = (Struct) topicResponseObj;
-                String topic = topicResponse.getString(TOPIC_KEY_NAME);
+                String topic = topicResponse.get(TOPIC_NAME);
                 for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                     Struct partitionResponse = (Struct) partitionResponseObj;
-                    int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                    int partition = partitionResponse.get(PARTITION_ID);
                     partitions.add(new TopicPartition(topic, partition));
                 }
             }
@@ -177,11 +217,11 @@ public class OffsetFetchRequest extends AbstractRequest {
             List<Struct> topicArray = new ArrayList<>();
             for (Map.Entry<String, List<Integer>> entries : topicsData.entrySet()) {
                 Struct topicData = struct.instance(TOPICS_KEY_NAME);
-                topicData.set(TOPIC_KEY_NAME, entries.getKey());
+                topicData.set(TOPIC_NAME, entries.getKey());
                 List<Struct> partitionArray = new ArrayList<>();
                 for (Integer partitionId : entries.getValue()) {
                     Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                    partitionData.set(PARTITION_KEY_NAME, partitionId);
+                    partitionData.set(PARTITION_ID, partitionId);
                     partitionArray.add(partitionData);
                 }
                 topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 6315535..c3341e0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -16,34 +16,71 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
 
 public class OffsetFetchResponse extends AbstractResponse {
 
     private static final String RESPONSES_KEY_NAME = "responses";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // topic level fields
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partition_responses";
 
     // partition level fields
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String COMMIT_OFFSET_KEY_NAME = "offset";
     private static final String METADATA_KEY_NAME = "metadata";
 
+    private static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(
+            PARTITION_ID,
+            new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Last committed message offset."),
+            new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Any associated metadata the client wants to keep."),
+            ERROR_CODE);
+
+    private static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+
+    private static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+
+    private static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
+
+    private static final Schema OFFSET_FETCH_RESPONSE_V2 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
+            ERROR_CODE);
+
+    /* v3 request is the same as v2. Throttle time has been added to v3 response */
+    private static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
+            ERROR_CODE);
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2,
+            OFFSET_FETCH_RESPONSE_V3};
+    }
+
     public static final long INVALID_OFFSET = -1L;
     public static final String NO_METADATA = "";
     public static final PartitionData UNKNOWN_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA,
@@ -62,8 +99,7 @@ public class OffsetFetchResponse extends AbstractResponse {
      *   - GROUP_AUTHORIZATION_FAILED (30)
      */
 
-    private static final List<Errors> PARTITION_ERRORS = Arrays.asList(
-            Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    private static final List<Errors> PARTITION_ERRORS = Collections.singletonList(Errors.UNKNOWN_TOPIC_OR_PARTITION);
 
     private final Map<TopicPartition, PartitionData> responseData;
     private final Errors error;
@@ -107,18 +143,18 @@ public class OffsetFetchResponse extends AbstractResponse {
     }
 
     public OffsetFetchResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
         Errors topLevelError = Errors.NONE;
         this.responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            String topic = topicResponse.get(TOPIC_NAME);
             for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                int partition = partitionResponse.get(PARTITION_ID);
                 long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
                 String metadata = partitionResponse.getString(METADATA_KEY_NAME);
-                Errors error = Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME));
+                Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE));
                 if (error != Errors.NONE && !PARTITION_ERRORS.contains(error))
                     topLevelError = error;
                 PartitionData partitionData = new PartitionData(offset, metadata, error);
@@ -130,7 +166,7 @@ public class OffsetFetchResponse extends AbstractResponse {
         // for older versions there is no top-level error in the response and all errors are partition errors,
         // so if there is a group or coordinator error at the partition level use that as the top-level error.
         // this way clients can depend on the top-level error regardless of the offset fetch version.
-        this.error = struct.hasField(ERROR_CODE_KEY_NAME) ? Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)) : topLevelError;
+        this.error = struct.hasField(ERROR_CODE) ? Errors.forCode(struct.get(ERROR_CODE)) : topLevelError;
     }
 
     public void maybeThrowFirstPartitionError() {
@@ -164,22 +200,21 @@ public class OffsetFetchResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.OFFSET_FETCH.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            topicData.set(TOPIC_NAME, entries.getKey());
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(PARTITION_ID, partitionEntry.getKey());
                 partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
                 partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
-                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code());
+                partitionData.set(ERROR_CODE, fetchPartitionData.error.code());
                 partitionArray.add(partitionData);
             }
             topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
@@ -188,7 +223,7 @@ public class OffsetFetchResponse extends AbstractResponse {
         struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
 
         if (version > 1)
-            struct.set(ERROR_CODE_KEY_NAME, this.error.code());
+            struct.set(ERROR_CODE, this.error.code());
 
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index fc31d75..b5fce78 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -28,12 +31,28 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+
 public class OffsetsForLeaderEpochRequest extends AbstractRequest {
-    public static final String TOPICS = "topics";
-    public static final String TOPIC = "topic";
-    public static final String PARTITIONS = "partitions";
-    public static final String PARTITION_ID = "partition_id";
-    public static final String LEADER_EPOCH = "leader_epoch";
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private static final String LEADER_EPOCH = "leader_epoch";
+
+    /* Offsets for Leader Epoch api */
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema(
+            PARTITION_ID,
+            new Field("leader_epoch", INT32, "The epoch"));
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0)));
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema(
+            new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0), "An array of topics to get epochs for"));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
+    }
 
     private Map<TopicPartition, Integer> epochsByPartition;
 
@@ -85,12 +104,12 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     public OffsetsForLeaderEpochRequest(Struct struct, short version) {
         super(version);
         epochsByPartition = new HashMap<>();
-        for (Object topicAndEpochsObj : struct.getArray(TOPICS)) {
+        for (Object topicAndEpochsObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicAndEpochs = (Struct) topicAndEpochsObj;
-            String topic = topicAndEpochs.getString(TOPIC);
-            for (Object partitionAndEpochObj : topicAndEpochs.getArray(PARTITIONS)) {
+            String topic = topicAndEpochs.get(TOPIC_NAME);
+            for (Object partitionAndEpochObj : topicAndEpochs.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
-                int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
+                int partitionId = partitionAndEpoch.get(PARTITION_ID);
                 int epoch = partitionAndEpoch.getInt(LEADER_EPOCH);
                 TopicPartition tp = new TopicPartition(topic, partitionId);
                 epochsByPartition.put(tp, epoch);
@@ -110,19 +129,19 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
 
         List<Struct> topics = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, Integer>> topicToEpochs : topicsToPartitionEpochs.entrySet()) {
-            Struct topicsStruct = requestStruct.instance(TOPICS);
-            topicsStruct.set(TOPIC, topicToEpochs.getKey());
+            Struct topicsStruct = requestStruct.instance(TOPICS_KEY_NAME);
+            topicsStruct.set(TOPIC_NAME, topicToEpochs.getKey());
             List<Struct> partitions = new ArrayList<>();
             for (Map.Entry<Integer, Integer> partitionEpoch : topicToEpochs.getValue().entrySet()) {
-                Struct partitionStruct = topicsStruct.instance(PARTITIONS);
+                Struct partitionStruct = topicsStruct.instance(PARTITIONS_KEY_NAME);
                 partitionStruct.set(PARTITION_ID, partitionEpoch.getKey());
                 partitionStruct.set(LEADER_EPOCH, partitionEpoch.getValue());
                 partitions.add(partitionStruct);
             }
-            topicsStruct.set(PARTITIONS, partitions.toArray());
+            topicsStruct.set(PARTITIONS_KEY_NAME, partitions.toArray());
             topics.add(topicsStruct);
         }
-        requestStruct.set(TOPICS, topics.toArray());
+        requestStruct.set(TOPICS_KEY_NAME, topics.toArray());
         return requestStruct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 4195b77..13d70b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -28,27 +31,44 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
 public class OffsetsForLeaderEpochResponse extends AbstractResponse {
-    public static final String TOPICS = "topics";
-    public static final String TOPIC = "topic";
-    public static final String PARTITIONS = "partitions";
-    public static final String ERROR_CODE = "error_code";
-    public static final String PARTITION_ID = "partition_id";
-    public static final String END_OFFSET = "end_offset";
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private static final String END_OFFSET_KEY_NAME = "end_offset";
+
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V0 = new Schema(
+            ERROR_CODE,
+            PARTITION_ID,
+            new Field(END_OFFSET_KEY_NAME, INT64, "The end offset"));
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V0)));
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0 = new Schema(
+            new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
+                    "An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
+    }
 
     private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
 
     public OffsetsForLeaderEpochResponse(Struct struct) {
         epochEndOffsetsByPartition = new HashMap<>();
-        for (Object topicAndEpocsObj : struct.getArray(TOPICS)) {
+        for (Object topicAndEpocsObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicAndEpochs = (Struct) topicAndEpocsObj;
-            String topic = topicAndEpochs.getString(TOPIC);
-            for (Object partitionAndEpochObj : topicAndEpochs.getArray(PARTITIONS)) {
+            String topic = topicAndEpochs.get(TOPIC_NAME);
+            for (Object partitionAndEpochObj : topicAndEpochs.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
-                Errors error = Errors.forCode(partitionAndEpoch.getShort(ERROR_CODE));
-                int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
+                Errors error = Errors.forCode(partitionAndEpoch.get(ERROR_CODE));
+                int partitionId = partitionAndEpoch.get(PARTITION_ID);
                 TopicPartition tp = new TopicPartition(topic, partitionId);
-                long endOffset = partitionAndEpoch.getLong(END_OFFSET);
+                long endOffset = partitionAndEpoch.getLong(END_OFFSET_KEY_NAME);
                 epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, endOffset));
             }
         }
@@ -74,21 +94,21 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
 
         List<Struct> topics = new ArrayList<>(endOffsetsByTopic.size());
         for (Map.Entry<String, Map<Integer, EpochEndOffset>> topicToPartitionEpochs : endOffsetsByTopic.entrySet()) {
-            Struct topicStruct = responseStruct.instance(TOPICS);
-            topicStruct.set(TOPIC, topicToPartitionEpochs.getKey());
+            Struct topicStruct = responseStruct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_NAME, topicToPartitionEpochs.getKey());
             Map<Integer, EpochEndOffset> partitionEpochs = topicToPartitionEpochs.getValue();
             List<Struct> partitions = new ArrayList<>();
             for (Map.Entry<Integer, EpochEndOffset> partitionEndOffset : partitionEpochs.entrySet()) {
-                Struct partitionStruct = topicStruct.instance(PARTITIONS);
+                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
                 partitionStruct.set(ERROR_CODE, partitionEndOffset.getValue().error().code());
                 partitionStruct.set(PARTITION_ID, partitionEndOffset.getKey());
-                partitionStruct.set(END_OFFSET, partitionEndOffset.getValue().endOffset());
+                partitionStruct.set(END_OFFSET_KEY_NAME, partitionEndOffset.getValue().endOffset());
                 partitions.add(partitionStruct);
             }
-            topicStruct.set(PARTITIONS, partitions.toArray());
+            topicStruct.set(PARTITIONS_KEY_NAME, partitions.toArray());
             topics.add(topicStruct);
         }
-        responseStruct.set(TOPICS, topics.toArray());
+        responseStruct.set(TOPICS_KEY_NAME, topics.toArray());
         return responseStruct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 089d199..eac7661 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -20,11 +20,14 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.InvalidRecordException;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.Utils;
 
@@ -36,6 +39,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.RECORDS;
+
 public class ProduceRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String ACKS_KEY_NAME = "acks";
@@ -43,13 +53,60 @@ public class ProduceRequest extends AbstractRequest {
     private static final String TOPIC_DATA_KEY_NAME = "topic_data";
 
     // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITION_DATA_KEY_NAME = "data";
 
     // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
+
+    private static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITION_DATA_KEY_NAME, new ArrayOf(new Schema(
+                    PARTITION_ID,
+                    new Field(RECORD_SET_KEY_NAME, RECORDS)))));
+
+    private static final Schema PRODUCE_REQUEST_V0 = new Schema(
+            new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " +
+                    "received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for " +
+                    "only the leader and -1 for the full ISR."),
+            new Field(TIMEOUT_KEY_NAME, INT32, "The time to await a response in ms."),
+            new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+    /**
+     * The body of PRODUCE_REQUEST_V1 is the same as PRODUCE_REQUEST_V0.
+     * The version number is bumped up to indicate that the client supports quota throttle time field in the response.
+     */
+    private static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0;
+    /**
+     * The body of PRODUCE_REQUEST_V2 is the same as PRODUCE_REQUEST_V1.
+     * The version number is bumped up to indicate that message format V1 is used which has relative offset and
+     * timestamp.
+     */
+    private static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1;
+
+    // Produce request V3 adds the transactional id which is used for authorization when attempting to write
+    // transactional data. This version also adds support for message format V2.
+    private static final Schema PRODUCE_REQUEST_V3 = new Schema(
+            new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional ID of the producer. This is used to " +
+                    "authorize transaction produce requests. This can be null for non-transactional producers."),
+            new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " +
+                    "received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 " +
+                    "for only the leader and -1 for the full ISR."),
+            new Field(TIMEOUT_KEY_NAME, INT32, "The time to await a response in ms."),
+            new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+    /**
+     * The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3.
+     * The version number is bumped up to indicate that the client supports KafkaStorageException.
+     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+     */
+    private static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
+            PRODUCE_REQUEST_V4};
+    }
+
     public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
         private final byte magic;
         private final short acks;
@@ -137,10 +194,10 @@ public class ProduceRequest extends AbstractRequest {
         partitionRecords = new HashMap<>();
         for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
             Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.getString(TOPIC_KEY_NAME);
+            String topic = topicData.get(TOPIC_NAME);
             for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                int partition = partitionResponse.get(PARTITION_ID);
                 MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME);
                 validateRecords(version, records);
                 partitionRecords.put(new TopicPartition(topic, partition), records);
@@ -195,12 +252,12 @@ public class ProduceRequest extends AbstractRequest {
         List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
         for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) {
             Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            topicData.set(TOPIC_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, MemoryRecords> partitionEntry : topicEntry.getValue().entrySet()) {
                 MemoryRecords records = partitionEntry.getValue();
                 Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
-                        .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+                        .set(PARTITION_ID, partitionEntry.getKey())
                         .set(RECORD_SET_KEY_NAME, records);
                 partitionArray.add(part);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index fdfba8b..4786307 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.utils.CollectionUtils;
@@ -29,6 +32,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
 /**
  * This wrapper supports both v0 and v1 of ProduceResponse.
  */
@@ -37,13 +46,8 @@ public class ProduceResponse extends AbstractResponse {
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
 
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
     public static final long INVALID_OFFSET = -1L;
 
     /**
@@ -68,6 +72,55 @@ public class ProduceResponse extends AbstractResponse {
     private static final String BASE_OFFSET_KEY_NAME = "base_offset";
     private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time";
 
+    private static final Schema PRODUCE_RESPONSE_V0 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                            PARTITION_ID,
+                            ERROR_CODE,
+                            new Field(BASE_OFFSET_KEY_NAME, INT64))))))));
+
+    private static final Schema PRODUCE_RESPONSE_V1 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                            PARTITION_ID,
+                            ERROR_CODE,
+                            new Field(BASE_OFFSET_KEY_NAME, INT64))))))),
+            THROTTLE_TIME_MS);
+
+    /**
+     * PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status.
+     * The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create
+     * time is used for the topic.
+     */
+    private static final Schema PRODUCE_RESPONSE_V2 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                            PARTITION_ID,
+                            ERROR_CODE,
+                            new Field(BASE_OFFSET_KEY_NAME, INT64),
+                            new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " +
+                                    "the messages. If CreateTime is used for the topic, the timestamp will be -1. " +
+                                    "If LogAppendTime is used for the topic, the timestamp will be " +
+                                    "the broker local time when the messages are appended."))))))),
+            THROTTLE_TIME_MS);
+
+    private static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
+
+    /**
+     * The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3.
+     * The version number is bumped up to indicate that the client supports KafkaStorageException.
+     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+     */
+    private static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3,
+            PRODUCE_RESPONSE_V4};
+    }
+
     private final Map<TopicPartition, PartitionResponse> responses;
     private final int throttleTime;
 
@@ -96,18 +149,18 @@ public class ProduceResponse extends AbstractResponse {
         responses = new HashMap<>();
         for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicRespStruct = (Struct) topicResponse;
-            String topic = topicRespStruct.getString(TOPIC_KEY_NAME);
+            String topic = topicRespStruct.get(TOPIC_NAME);
             for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
                 Struct partRespStruct = (Struct) partResponse;
-                int partition = partRespStruct.getInt(PARTITION_KEY_NAME);
-                Errors error = Errors.forCode(partRespStruct.getShort(ERROR_CODE_KEY_NAME));
+                int partition = partRespStruct.get(PARTITION_ID);
+                Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
                 long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
                 long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME);
                 TopicPartition tp = new TopicPartition(topic, partition);
                 responses.put(tp, new PartitionResponse(error, offset, logAppendTime));
             }
         }
-        this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        this.throttleTime = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
     }
 
     @Override
@@ -118,7 +171,7 @@ public class ProduceResponse extends AbstractResponse {
         List<Struct> topicDatas = new ArrayList<>(responseByTopic.size());
         for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entry.getKey());
+            topicData.set(TOPIC_NAME, entry.getKey());
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
                 PartitionResponse part = partitionEntry.getValue();
@@ -130,8 +183,8 @@ public class ProduceResponse extends AbstractResponse {
                 if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3)
                     errorCode = Errors.NOT_LEADER_FOR_PARTITION.code();
                 Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
-                        .set(PARTITION_KEY_NAME, partitionEntry.getKey())
-                        .set(ERROR_CODE_KEY_NAME, errorCode)
+                        .set(PARTITION_ID, partitionEntry.getKey())
+                        .set(ERROR_CODE, errorCode)
                         .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
                 if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
                     partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
@@ -141,9 +194,8 @@ public class ProduceResponse extends AbstractResponse {
             topicDatas.add(topicData);
         }
         struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTime);
 
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
index 34bb3f5..5132202 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -86,7 +85,7 @@ public class RequestContext {
     }
 
     private boolean isUnsupportedApiVersionsRequest() {
-        return header.apiKey() == API_VERSIONS && !Protocol.apiVersionSupported(API_VERSIONS.id, header.apiVersion());
+        return header.apiKey() == API_VERSIONS && !API_VERSIONS.isVersionSupported(header.apiVersion());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 43b7baf..1284e7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -18,13 +18,16 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Protocol;
+import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
 
 /**
  * The header for a request in the Kafka protocol
@@ -35,6 +38,19 @@ public class RequestHeader extends AbstractRequestResponse {
     private static final String CLIENT_ID_FIELD_NAME = "client_id";
     private static final String CORRELATION_ID_FIELD_NAME = "correlation_id";
 
+    public static final Schema SCHEMA = new Schema(
+            new Field(API_KEY_FIELD_NAME, INT16, "The id of the request type."),
+            new Field(API_VERSION_FIELD_NAME, INT16, "The version of the API."),
+            new Field(CORRELATION_ID_FIELD_NAME, INT32, "A user-supplied integer value that will be passed back with the response"),
+            new Field(CLIENT_ID_FIELD_NAME, NULLABLE_STRING, "A user specified identifier for the client making the request.", ""));
+
+    // Version 0 of the controlled shutdown API used a non-standard request header (the clientId is missing).
+    // This can be removed once we drop support for that version.
+    private static final Schema CONTROLLED_SHUTDOWN_V0_SCHEMA = new Schema(
+            new Field(API_KEY_FIELD_NAME, INT16, "The id of the request type."),
+            new Field(API_VERSION_FIELD_NAME, INT16, "The version of the API."),
+            new Field(CORRELATION_ID_FIELD_NAME, INT32, "A user-supplied integer value that will be passed back with the response"));
+
     private final ApiKeys apiKey;
     private final short apiVersion;
     private final String clientId;
@@ -64,7 +80,7 @@ public class RequestHeader extends AbstractRequestResponse {
     }
 
     public Struct toStruct() {
-        Schema schema = Protocol.requestHeaderSchema(apiKey.id, apiVersion);
+        Schema schema = schema(apiKey.id, apiVersion);
         Struct struct = new Struct(schema);
         struct.set(API_KEY_FIELD_NAME, apiKey.id);
         struct.set(API_VERSION_FIELD_NAME, apiVersion);
@@ -100,7 +116,7 @@ public class RequestHeader extends AbstractRequestResponse {
         try {
             short apiKey = buffer.getShort();
             short apiVersion = buffer.getShort();
-            Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion);
+            Schema schema = schema(apiKey, apiVersion);
             buffer.rewind();
             return new RequestHeader(schema.read(buffer));
         } catch (InvalidRequestException e) {
@@ -140,4 +156,13 @@ public class RequestHeader extends AbstractRequestResponse {
         result = 31 * result + correlationId;
         return result;
     }
+
+
+    private static Schema schema(short apiKey, short version) {
+        if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN.id && version == 0)
+            // This will be removed once we remove support for v0 of ControlledShutdownRequest, which
+            // depends on a non-standard request header (it does not have a clientId)
+            return CONTROLLED_SHUTDOWN_V0_SCHEMA;
+        return SCHEMA;
+    }
 }


Mime
View raw message