kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/8] kafka git commit: MINOR: Move request/response schemas to the corresponding object representation
Date Tue, 19 Sep 2017 04:13:23 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index fa23559..103246c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -25,58 +25,69 @@ import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
 
+import static org.apache.kafka.common.protocol.CommonFields.HOST;
+import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
+import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
+
 class RequestUtils {
+
     static Resource resourceFromStructFields(Struct struct) {
-        byte resourceType = struct.getByte("resource_type");
-        String name = struct.getString("resource_name");
+        byte resourceType = struct.get(RESOURCE_TYPE);
+        String name = struct.get(RESOURCE_NAME);
         return new Resource(ResourceType.fromCode(resourceType), name);
     }
 
     static void resourceSetStructFields(Resource resource, Struct struct) {
-        struct.set("resource_type", resource.resourceType().code());
-        struct.set("resource_name", resource.name());
+        struct.set(RESOURCE_TYPE, resource.resourceType().code());
+        struct.set(RESOURCE_NAME, resource.name());
     }
 
     static ResourceFilter resourceFilterFromStructFields(Struct struct) {
-        byte resourceType = struct.getByte("resource_type");
-        String name = struct.getString("resource_name");
+        byte resourceType = struct.get(RESOURCE_TYPE);
+        String name = struct.get(RESOURCE_NAME_FILTER);
         return new ResourceFilter(ResourceType.fromCode(resourceType), name);
     }
 
     static void resourceFilterSetStructFields(ResourceFilter resourceFilter, Struct struct) {
-        struct.set("resource_type", resourceFilter.resourceType().code());
-        struct.set("resource_name", resourceFilter.name());
+        struct.set(RESOURCE_TYPE, resourceFilter.resourceType().code());
+        struct.set(RESOURCE_NAME_FILTER, resourceFilter.name());
     }
 
     static AccessControlEntry aceFromStructFields(Struct struct) {
-        String principal = struct.getString("principal");
-        String host = struct.getString("host");
-        byte operation = struct.getByte("operation");
-        byte permissionType = struct.getByte("permission_type");
+        String principal = struct.get(PRINCIPAL);
+        String host = struct.get(HOST);
+        byte operation = struct.get(OPERATION);
+        byte permissionType = struct.get(PERMISSION_TYPE);
         return new AccessControlEntry(principal, host, AclOperation.fromCode(operation),
             AclPermissionType.fromCode(permissionType));
     }
 
     static void aceSetStructFields(AccessControlEntry data, Struct struct) {
-        struct.set("principal", data.principal());
-        struct.set("host", data.host());
-        struct.set("operation", data.operation().code());
-        struct.set("permission_type", data.permissionType().code());
+        struct.set(PRINCIPAL, data.principal());
+        struct.set(HOST, data.host());
+        struct.set(OPERATION, data.operation().code());
+        struct.set(PERMISSION_TYPE, data.permissionType().code());
     }
 
     static AccessControlEntryFilter aceFilterFromStructFields(Struct struct) {
-        String principal = struct.getString("principal");
-        String host = struct.getString("host");
-        byte operation = struct.getByte("operation");
-        byte permissionType = struct.getByte("permission_type");
+        String principal = struct.get(PRINCIPAL_FILTER);
+        String host = struct.get(HOST_FILTER);
+        byte operation = struct.get(OPERATION);
+        byte permissionType = struct.get(PERMISSION_TYPE);
         return new AccessControlEntryFilter(principal, host, AclOperation.fromCode(operation),
             AclPermissionType.fromCode(permissionType));
     }
 
     static void aceFilterSetStructFields(AccessControlEntryFilter filter, Struct struct) {
-        struct.set("principal", filter.principal());
-        struct.set("host", filter.host());
-        struct.set("operation", filter.operation().code());
-        struct.set("permission_type", filter.permissionType().code());
+        struct.set(PRINCIPAL_FILTER, filter.principal());
+        struct.set(HOST_FILTER, filter.host());
+        struct.set(OPERATION, filter.operation().code());
+        struct.set(PERMISSION_TYPE, filter.permissionType().code());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
index 895716b..fe452a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
@@ -16,21 +16,22 @@
  */
 package org.apache.kafka.common.requests;
 
-import static org.apache.kafka.common.protocol.Protocol.RESPONSE_HEADER;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kafka.common.protocol.Protocol;
+import org.apache.kafka.common.protocol.types.BoundField;
 import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.types.Type.INT32;
 
 /**
  * A response header in the kafka protocol.
  */
 public class ResponseHeader extends AbstractRequestResponse {
-
-    private static final Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
+    public static final Schema SCHEMA = new Schema(
+            new Field("correlation_id", INT32, "The user-supplied value passed in with the request"));
+    private static final BoundField CORRELATION_KEY_FIELD = SCHEMA.get("correlation_id");
 
     private final int correlationId;
 
@@ -47,7 +48,7 @@ public class ResponseHeader extends AbstractRequestResponse {
     }
 
     public Struct toStruct() {
-        Struct struct = new Struct(Protocol.RESPONSE_HEADER);
+        Struct struct = new Struct(SCHEMA);
         struct.set(CORRELATION_KEY_FIELD, correlationId);
         return struct;
     }
@@ -57,7 +58,7 @@ public class ResponseHeader extends AbstractRequestResponse {
     }
 
     public static ResponseHeader parse(ByteBuffer buffer) {
-        return new ResponseHeader(Protocol.RESPONSE_HEADER.read(buffer));
+        return new ResponseHeader(SCHEMA.read(buffer));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
index f21c896..74d31a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
 
 /**
  * Request from SASL client containing client SASL authentication token as defined by the
@@ -32,9 +35,15 @@ import org.apache.kafka.common.protocol.types.Struct;
  * brokers will send SaslHandshake request v0 followed by SASL tokens without the Kafka request headers.
  */
 public class SaslAuthenticateRequest extends AbstractRequest {
-
     private static final String SASL_AUTH_BYTES_KEY_NAME = "sasl_auth_bytes";
 
+    private static final Schema SASL_AUTHENTICATE_REQUEST_V0 = new Schema(
+            new Field(SASL_AUTH_BYTES_KEY_NAME, BYTES, "SASL authentication bytes from client as defined by the SASL mechanism."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{SASL_AUTHENTICATE_REQUEST_V0};
+    }
+
     private final ByteBuffer saslAuthBytes;
 
     public static class Builder extends AbstractRequest.Builder<SaslAuthenticateRequest> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index 2119f21..1dd0e76 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -16,23 +16,35 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+
 
 /**
  * Response from SASL server which for a SASL challenge as defined by the SASL protocol
  * for the mechanism configured for the client.
  */
 public class SaslAuthenticateResponse extends AbstractResponse {
-
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
     private static final String SASL_AUTH_BYTES_KEY_NAME = "sasl_auth_bytes";
 
+    private static final Schema SASL_AUTHENTICATE_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(SASL_AUTH_BYTES_KEY_NAME, BYTES, "SASL authentication bytes from server as defined by the SASL mechanism."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{SASL_AUTHENTICATE_RESPONSE_V0};
+    }
+
     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
 
     private final ByteBuffer saslAuthBytes;
@@ -55,8 +67,8 @@ public class SaslAuthenticateResponse extends AbstractResponse {
     }
 
     public SaslAuthenticateResponse(Struct struct) {
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
-        errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
+        error = Errors.forCode(struct.get(ERROR_CODE));
+        errorMessage = struct.get(ERROR_MESSAGE);
         saslAuthBytes = struct.getBytes(SASL_AUTH_BYTES_KEY_NAME);
     }
 
@@ -75,8 +87,8 @@ public class SaslAuthenticateResponse extends AbstractResponse {
     @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.responseSchema(version));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage);
+        struct.set(ERROR_CODE, error.code());
+        struct.set(ERROR_MESSAGE, errorMessage);
         struct.set(SASL_AUTH_BYTES_KEY_NAME, saslAuthBytes);
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index 9906d13..a06a4db 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -16,14 +16,17 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 /**
  * Request from SASL client containing client SASL mechanism.
@@ -35,8 +38,17 @@ import org.apache.kafka.common.protocol.types.Struct;
  * making it easy to distinguish from a GSSAPI packet.
  */
 public class SaslHandshakeRequest extends AbstractRequest {
+    private static final String MECHANISM_KEY_NAME = "mechanism";
+
+    private static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema(
+            new Field("mechanism", STRING, "SASL Mechanism chosen by the client."));
 
-    public static final String MECHANISM_KEY_NAME = "mechanism";
+    // SASL_HANDSHAKE_REQUEST_V1 added to support SASL_AUTHENTICATE request to improve diagnostics
+    private static final Schema SASL_HANDSHAKE_REQUEST_V1 = SASL_HANDSHAKE_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{SASL_HANDSHAKE_REQUEST_V0, SASL_HANDSHAKE_REQUEST_V1};
+    }
 
     private final String mechanism;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index e70bad3..c9f6369 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -16,14 +16,20 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 
 
 /**
@@ -31,10 +37,18 @@ import org.apache.kafka.common.protocol.types.Struct;
  * For error responses, the list of enabled mechanisms is included in the response.
  */
 public class SaslHandshakeResponse extends AbstractResponse {
-
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String ENABLED_MECHANISMS_KEY_NAME = "enabled_mechanisms";
 
+    private static final Schema SASL_HANDSHAKE_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field(ENABLED_MECHANISMS_KEY_NAME, new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server."));
+
+    private static final Schema SASL_HANDSHAKE_RESPONSE_V1 = SASL_HANDSHAKE_RESPONSE_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{SASL_HANDSHAKE_RESPONSE_V0, SASL_HANDSHAKE_RESPONSE_V1};
+    }
+
     /**
      * Possible error codes:
      *   UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server
@@ -49,7 +63,7 @@ public class SaslHandshakeResponse extends AbstractResponse {
     }
 
     public SaslHandshakeResponse(Struct struct) {
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        error = Errors.forCode(struct.get(ERROR_CODE));
         Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME);
         ArrayList<String> enabledMechanisms = new ArrayList<>();
         for (Object mechanism : mechanisms)
@@ -64,7 +78,7 @@ public class SaslHandshakeResponse extends AbstractResponse {
     @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(ERROR_CODE, error.code());
         struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 48aa16e..722a604 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -30,13 +33,29 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+
 public class StopReplicaRequest extends AbstractRequest {
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
     private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
     private static final String DELETE_PARTITIONS_KEY_NAME = "delete_partitions";
     private static final String PARTITIONS_KEY_NAME = "partitions";
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_KEY_NAME = "partition";
+
+    private static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(
+            TOPIC_NAME,
+            PARTITION_ID);
+    private static final Schema STOP_REPLICA_REQUEST_V0 = new Schema(
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(DELETE_PARTITIONS_KEY_NAME, BOOLEAN, "Boolean which indicates if replica's partitions must be deleted."),
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(STOP_REPLICA_REQUEST_PARTITION_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {STOP_REPLICA_REQUEST_V0};
+    }
 
     public static class Builder extends AbstractRequest.Builder<StopReplicaRequest> {
         private final int controllerId;
@@ -92,8 +111,8 @@ public class StopReplicaRequest extends AbstractRequest {
         partitions = new HashSet<>();
         for (Object partitionDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
             Struct partitionData = (Struct) partitionDataObj;
-            String topic = partitionData.getString(TOPIC_KEY_NAME);
-            int partition = partitionData.getInt(PARTITION_KEY_NAME);
+            String topic = partitionData.get(TOPIC_NAME);
+            int partition = partitionData.get(PARTITION_ID);
             partitions.add(new TopicPartition(topic, partition));
         }
 
@@ -150,8 +169,8 @@ public class StopReplicaRequest extends AbstractRequest {
         List<Struct> partitionDatas = new ArrayList<>(partitions.size());
         for (TopicPartition partition : partitions) {
             Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
-            partitionData.set(TOPIC_KEY_NAME, partition.topic());
-            partitionData.set(PARTITION_KEY_NAME, partition.partition());
+            partitionData.set(TOPIC_NAME, partition.topic());
+            partitionData.set(PARTITION_ID, partition.partition());
             partitionDatas.add(partitionData);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index c859f7f..4196b83 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,14 +30,24 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class StopReplicaResponse extends AbstractResponse {
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+public class StopReplicaResponse extends AbstractResponse {
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
-    private static final String PARTITIONS_TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_PARTITION_KEY_NAME = "partition";
-    private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code";
+    private static final Schema STOP_REPLICA_RESPONSE_PARTITION_V0 = new Schema(
+            TOPIC_NAME,
+            PARTITION_ID,
+            ERROR_CODE);
+    private static final Schema STOP_REPLICA_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(STOP_REPLICA_RESPONSE_PARTITION_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {STOP_REPLICA_RESPONSE_V0};
+    }
 
     private final Map<TopicPartition, Errors> responses;
 
@@ -54,13 +67,13 @@ public class StopReplicaResponse extends AbstractResponse {
         responses = new HashMap<>();
         for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
             Struct responseData = (Struct) responseDataObj;
-            String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME);
-            int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME);
-            Errors error = Errors.forCode(responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME));
+            String topic = responseData.get(TOPIC_NAME);
+            int partition = responseData.get(PARTITION_ID);
+            Errors error = Errors.forCode(responseData.get(ERROR_CODE));
             responses.put(new TopicPartition(topic, partition), error);
         }
 
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
     public Map<TopicPartition, Errors> responses() {
@@ -83,15 +96,14 @@ public class StopReplicaResponse extends AbstractResponse {
         for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) {
             Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
             TopicPartition partition = response.getKey();
-            partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
-            partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition());
-            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code());
+            partitionData.set(TOPIC_NAME, partition.topic());
+            partitionData.set(PARTITION_ID, partition.partition());
+            partitionData.set(ERROR_CODE, response.getValue().code());
             responseDatas.add(partitionData);
         }
 
         struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-
+        struct.set(ERROR_CODE, error.code());
         return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 82df84a..4ff9fcd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -27,12 +30,32 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class SyncGroupRequest extends AbstractRequest {
-    public static final String GROUP_ID_KEY_NAME = "group_id";
-    public static final String GENERATION_ID_KEY_NAME = "generation_id";
-    public static final String MEMBER_ID_KEY_NAME = "member_id";
-    public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
-    public static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment";
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String GENERATION_ID_KEY_NAME = "generation_id";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
+    private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+    private static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment";
+
+    private static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(
+            new Field(MEMBER_ID_KEY_NAME, STRING),
+            new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
+    private static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING),
+            new Field(GENERATION_ID_KEY_NAME, INT32),
+            new Field(MEMBER_ID_KEY_NAME, STRING),
+            new Field(GROUP_ASSIGNMENT_KEY_NAME, new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
+
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    private static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
+    }
 
     public static class Builder extends AbstractRequest.Builder<SyncGroupRequest> {
         private final String groupId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index c96e21f..d68b2cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -18,14 +18,30 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+
 public class SyncGroupResponse extends AbstractResponse {
+    private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+
+    private static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
+    private static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
 
-    public static final String ERROR_CODE_KEY_NAME = "error_code";
-    public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+    public static Schema[] schemaVersions() {
+        return new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
+    }
 
     /**
      * Possible error codes:
@@ -53,8 +69,8 @@ public class SyncGroupResponse extends AbstractResponse {
     }
 
     public SyncGroupResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        this.error = Errors.forCode(struct.get(ERROR_CODE));
         this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
     }
 
@@ -73,9 +89,8 @@ public class SyncGroupResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SYNC_GROUP.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(ERROR_CODE, error.code());
         struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 20522af..d384192 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -26,18 +29,42 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class TxnOffsetCommitRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
-    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
-    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String TOPICS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String OFFSET_KEY_NAME = "offset";
     private static final String METADATA_KEY_NAME = "metadata";
 
+    private static final Schema TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0 = new Schema(
+            PARTITION_ID,
+            new Field(OFFSET_KEY_NAME, INT64),
+            new Field(METADATA_KEY_NAME, NULLABLE_STRING));
+
+    private static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema(
+            new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."),
+            new Field(CONSUMER_GROUP_ID_KEY_NAME, STRING, "Id of the associated consumer group to commit offsets for."),
+            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
+            new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0)))),
+                    "The partitions to write markers for."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{TXN_OFFSET_COMMIT_REQUEST_V0};
+    }
+
     public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
         private final String transactionalId;
         private final String consumerGroupId;
@@ -106,13 +133,13 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
 
         Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
-        Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+        Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
         for (Object topicPartitionObj : topicPartitionsArray) {
             Struct topicPartitionStruct = (Struct) topicPartitionObj;
-            String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+            String topic = topicPartitionStruct.get(TOPIC_NAME);
             for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionStruct = (Struct) partitionObj;
-                TopicPartition partition = new TopicPartition(topic, partitionStruct.getInt(PARTITION_KEY_NAME));
+                TopicPartition partition = new TopicPartition(topic, partitionStruct.get(PARTITION_ID));
                 long offset = partitionStruct.getLong(OFFSET_KEY_NAME);
                 String metadata = partitionStruct.getString(METADATA_KEY_NAME);
                 offsets.put(partition, new CommittedOffset(offset, metadata));
@@ -153,15 +180,15 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];
         int i = 0;
         for (Map.Entry<String, Map<Integer, CommittedOffset>> topicAndPartitions : mappedPartitionOffsets.entrySet()) {
-            Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
-            topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+            Struct topicPartitionsStruct = struct.instance(TOPICS_KEY_NAME);
+            topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
 
             Map<Integer, CommittedOffset> partitionOffsets = topicAndPartitions.getValue();
             Object[] partitionOffsetsArray = new Object[partitionOffsets.size()];
             int j = 0;
             for (Map.Entry<Integer, CommittedOffset> partitionOffset : partitionOffsets.entrySet()) {
                 Struct partitionOffsetStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
-                partitionOffsetStruct.set(PARTITION_KEY_NAME, partitionOffset.getKey());
+                partitionOffsetStruct.set(PARTITION_ID, partitionOffset.getKey());
                 CommittedOffset committedOffset = partitionOffset.getValue();
                 partitionOffsetStruct.set(OFFSET_KEY_NAME, committedOffset.offset);
                 partitionOffsetStruct.set(METADATA_KEY_NAME, committedOffset.metadata);
@@ -171,7 +198,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
             partitionsArray[i++] = topicPartitionsStruct;
         }
 
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+        struct.set(TOPICS_KEY_NAME, partitionsArray);
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 9a1cefa..53804d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -26,12 +29,29 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
 public class TxnOffsetCommitResponse extends AbstractResponse {
-    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+    private static final String TOPICS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    private static final Schema TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE);
+
+    private static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0)))),
+                    "Errors per partition from writing markers."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{TXN_OFFSET_COMMIT_RESPONSE_V0};
+    }
 
     // Possible error codes:
     //   InvalidProducerEpoch
@@ -53,16 +73,16 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
     }
 
     public TxnOffsetCommitResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         Map<TopicPartition, Errors> errors = new HashMap<>();
-        Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+        Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
         for (Object topicPartitionObj : topicPartitionsArray) {
             Struct topicPartitionStruct = (Struct) topicPartitionObj;
-            String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+            String topic = topicPartitionStruct.get(TOPIC_NAME);
             for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionStruct = (Struct) partitionObj;
-                Integer partition = partitionStruct.getInt(PARTITION_KEY_NAME);
-                Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                Integer partition = partitionStruct.get(PARTITION_ID);
+                Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
                 errors.put(new TopicPartition(topic, partition), error);
             }
         }
@@ -72,28 +92,28 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(errors);
         Object[] partitionsArray = new Object[mappedPartitions.size()];
         int i = 0;
         for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
-            Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
-            topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+            Struct topicPartitionsStruct = struct.instance(TOPICS_KEY_NAME);
+            topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
             Map<Integer, Errors> partitionAndErrors = topicAndPartitions.getValue();
 
             Object[] partitionAndErrorsArray = new Object[partitionAndErrors.size()];
             int j = 0;
             for (Map.Entry<Integer, Errors> partitionAndError : partitionAndErrors.entrySet()) {
                 Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
-                partitionAndErrorStruct.set(PARTITION_KEY_NAME, partitionAndError.getKey());
-                partitionAndErrorStruct.set(ERROR_CODE_KEY_NAME, partitionAndError.getValue().code());
+                partitionAndErrorStruct.set(PARTITION_ID, partitionAndError.getKey());
+                partitionAndErrorStruct.set(ERROR_CODE, partitionAndError.getValue().code());
                 partitionAndErrorsArray[j++] = partitionAndErrorStruct;
             }
             topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
             partitionsArray[i++] = topicPartitionsStruct;
         }
 
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+        struct.set(TOPICS_KEY_NAME, partitionsArray);
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 5f17d0b..67ae8e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -34,7 +37,136 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class UpdateMetadataRequest extends AbstractRequest {
+
+    private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
+    private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
+    private static final String PARTITION_STATES_KEY_NAME = "partition_states";
+    private static final String LIVE_BROKERS_KEY_NAME = "live_brokers";
+
+    // PartitionState key names
+    private static final String LEADER_KEY_NAME = "leader";
+    private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch";
+    private static final String ISR_KEY_NAME = "isr";
+    private static final String ZK_VERSION_KEY_NAME = "zk_version";
+    private static final String REPLICAS_KEY_NAME = "replicas";
+    private static final String OFFLINE_REPLICAS_KEY_NAME = "offline_replicas";
+
+    // Broker key names
+    private static final String BROKER_ID_KEY_NAME = "id";
+    private static final String ENDPOINTS_KEY_NAME = "end_points";
+    private static final String RACK_KEY_NAME = "rack";
+
+    // EndPoint key names
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+    private static final String LISTENER_NAME_KEY_NAME = "listener_name";
+    private static final String SECURITY_PROTOCOL_TYPE_KEY_NAME = "security_protocol_type";
+
+    private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V0 = new Schema(
+                    TOPIC_NAME,
+                    PARTITION_ID,
+                    new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+                    new Field(LEADER_KEY_NAME, INT32, "The broker id for the leader."),
+                    new Field(LEADER_EPOCH_KEY_NAME, INT32, "The leader epoch."),
+                    new Field(ISR_KEY_NAME, new ArrayOf(INT32), "The in sync replica ids."),
+                    new Field(ZK_VERSION_KEY_NAME, INT32, "The ZK version."),
+                    new Field(REPLICAS_KEY_NAME, new ArrayOf(INT32), "The replica ids."));
+
+    private static final Schema UPDATE_METADATA_REQUEST_BROKER_V0 = new Schema(
+            new Field(BROKER_ID_KEY_NAME, INT32, "The broker id."),
+            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
+            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));
+
+    private static final Schema UPDATE_METADATA_REQUEST_V0 = new Schema(
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V0)),
+            new Field(LIVE_BROKERS_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V0)));
+
+    private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V1 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V0;
+
+    // for some reason, V1 sends `port` before `host` while V0 sends `host` before `port
+    private static final Schema UPDATE_METADATA_REQUEST_END_POINT_V1 = new Schema(
+            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."),
+            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
+            new Field(SECURITY_PROTOCOL_TYPE_KEY_NAME, INT16, "The security protocol type."));
+
+    private static final Schema UPDATE_METADATA_REQUEST_BROKER_V1 = new Schema(
+            new Field(BROKER_ID_KEY_NAME, INT32, "The broker id."),
+            new Field(ENDPOINTS_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V1)));
+
+    private static final Schema UPDATE_METADATA_REQUEST_V1 = new Schema(
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V1)),
+            new Field(LIVE_BROKERS_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V1)));
+
+    private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V2 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V1;
+
+    private static final Schema UPDATE_METADATA_REQUEST_END_POINT_V2 = UPDATE_METADATA_REQUEST_END_POINT_V1;
+
+    private static final Schema UPDATE_METADATA_REQUEST_BROKER_V2 = new Schema(
+            new Field(BROKER_ID_KEY_NAME, INT32, "The broker id."),
+            new Field(ENDPOINTS_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V2)),
+            new Field(RACK_KEY_NAME, NULLABLE_STRING, "The rack"));
+
+    private static final Schema UPDATE_METADATA_REQUEST_V2 = new Schema(
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V2)),
+            new Field(LIVE_BROKERS_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V2)));
+
+    private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V2;
+
+    // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 = new Schema(
+            TOPIC_NAME,
+            PARTITION_ID,
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(LEADER_KEY_NAME, INT32, "The broker id for the leader."),
+            new Field(LEADER_EPOCH_KEY_NAME, INT32, "The leader epoch."),
+            new Field(ISR_KEY_NAME, new ArrayOf(INT32), "The in sync replica ids."),
+            new Field(ZK_VERSION_KEY_NAME, INT32, "The ZK version."),
+            new Field(REPLICAS_KEY_NAME, new ArrayOf(INT32), "The replica ids."),
+            new Field(OFFLINE_REPLICAS_KEY_NAME, new ArrayOf(INT32), "The offline replica ids"));
+
+    private static final Schema UPDATE_METADATA_REQUEST_END_POINT_V3 = new Schema(
+            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."),
+            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
+            new Field(LISTENER_NAME_KEY_NAME, STRING, "The listener name."),
+            new Field(SECURITY_PROTOCOL_TYPE_KEY_NAME, INT16, "The security protocol type."));
+
+    private static final Schema UPDATE_METADATA_REQUEST_BROKER_V3 = new Schema(
+            new Field(BROKER_ID_KEY_NAME, INT32, "The broker id."),
+            new Field(ENDPOINTS_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V3)),
+            new Field(RACK_KEY_NAME, NULLABLE_STRING, "The rack"));
+
+    private static final Schema UPDATE_METADATA_REQUEST_V3 = new Schema(
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V3)),
+            new Field(LIVE_BROKERS_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3)));
+
+    // UPDATE_METADATA_REQUEST_V4 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    private static final Schema UPDATE_METADATA_REQUEST_V4 = new Schema(
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V4)),
+            new Field(LIVE_BROKERS_KEY_NAME, new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2,
+            UPDATE_METADATA_REQUEST_V3, UPDATE_METADATA_REQUEST_V4};
+    }
+
     public static class Builder extends AbstractRequest.Builder<UpdateMetadataRequest> {
         private final int controllerId;
         private final int controllerEpoch;
@@ -43,7 +175,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
 
         public Builder(short version, int controllerId, int controllerEpoch,
                        Map<TopicPartition, PartitionState> partitionStates, Set<Broker> liveBrokers) {
-            super(ApiKeys.UPDATE_METADATA_KEY, version);
+            super(ApiKeys.UPDATE_METADATA, version);
             this.controllerId = controllerId;
             this.controllerEpoch = controllerEpoch;
             this.partitionStates = partitionStates;
@@ -144,32 +276,6 @@ public class UpdateMetadataRequest extends AbstractRequest {
         }
     }
 
-    private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
-    private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
-    private static final String PARTITION_STATES_KEY_NAME = "partition_states";
-    private static final String LIVE_BROKERS_KEY_NAME = "live_brokers";
-
-    // PartitionState key names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String LEADER_KEY_NAME = "leader";
-    private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch";
-    private static final String ISR_KEY_NAME = "isr";
-    private static final String ZK_VERSION_KEY_NAME = "zk_version";
-    private static final String REPLICAS_KEY_NAME = "replicas";
-    private static final String OFFLINE_REPLICAS_KEY_NAME = "offline_replicas";
-
-    // Broker key names
-    private static final String BROKER_ID_KEY_NAME = "id";
-    private static final String ENDPOINTS_KEY_NAME = "end_points";
-    private static final String RACK_KEY_NAME = "rack";
-
-    // EndPoint key names
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-    private static final String LISTENER_NAME_KEY_NAME = "listener_name";
-    private static final String SECURITY_PROTOCOL_TYPE_KEY_NAME = "security_protocol_type";
-
     private final int controllerId;
     private final int controllerEpoch;
     private final Map<TopicPartition, PartitionState> partitionStates;
@@ -189,8 +295,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) {
             Struct partitionStateData = (Struct) partitionStateDataObj;
-            String topic = partitionStateData.getString(TOPIC_KEY_NAME);
-            int partition = partitionStateData.getInt(PARTITION_KEY_NAME);
+            String topic = partitionStateData.get(TOPIC_NAME);
+            int partition = partitionStateData.get(PARTITION_ID);
             int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME);
             int leader = partitionStateData.getInt(LEADER_KEY_NAME);
             int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME);
@@ -264,7 +370,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         short version = version();
-        Struct struct = new Struct(ApiKeys.UPDATE_METADATA_KEY.requestSchema(version));
+        Struct struct = new Struct(ApiKeys.UPDATE_METADATA.requestSchema(version));
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
         struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
 
@@ -272,8 +378,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
         for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) {
             Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME);
             TopicPartition topicPartition = entry.getKey();
-            partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
-            partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
+            partitionStateData.set(TOPIC_NAME, topicPartition.topic());
+            partitionStateData.set(PARTITION_ID, topicPartition.partition());
             PartitionState partitionState = entry.getValue();
             partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.basePartitionState.controllerEpoch);
             partitionStateData.set(LEADER_KEY_NAME, partitionState.basePartitionState.leader);
@@ -328,7 +434,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
             return new UpdateMetadataResponse(Errors.forException(e));
         else
             throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                    versionId, this.getClass().getSimpleName(), ApiKeys.UPDATE_METADATA_KEY.latestVersion()));
+                    versionId, this.getClass().getSimpleName(), ApiKeys.UPDATE_METADATA.latestVersion()));
     }
 
     public int controllerId() {
@@ -348,7 +454,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     public static UpdateMetadataRequest parse(ByteBuffer buffer, short version) {
-        return new UpdateMetadataRequest(ApiKeys.UPDATE_METADATA_KEY.parseRequest(version, buffer), version);
+        return new UpdateMetadataRequest(ApiKeys.UPDATE_METADATA.parseRequest(version, buffer), version);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 4eae39e..9ff8e27 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -18,13 +18,24 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+
 public class UpdateMetadataResponse extends AbstractResponse {
+    private static final Schema UPDATE_METADATA_RESPONSE_V0 = new Schema(ERROR_CODE);
+    private static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0;
+    private static final Schema UPDATE_METADATA_RESPONSE_V2 = UPDATE_METADATA_RESPONSE_V1;
+    private static final Schema UPDATE_METADATA_RESPONSE_V3 = UPDATE_METADATA_RESPONSE_V2;
+    private static final Schema UPDATE_METADATA_RESPONSE_V4 = UPDATE_METADATA_RESPONSE_V3;
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    public static Schema[] schemaVersions() {
+        return new Schema[]{UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2,
+            UPDATE_METADATA_RESPONSE_V3, UPDATE_METADATA_RESPONSE_V4};
+    }
 
     /**
      * Possible error code:
@@ -38,7 +49,7 @@ public class UpdateMetadataResponse extends AbstractResponse {
     }
 
     public UpdateMetadataResponse(Struct struct) {
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
     public Errors error() {
@@ -46,13 +57,13 @@ public class UpdateMetadataResponse extends AbstractResponse {
     }
 
     public static UpdateMetadataResponse parse(ByteBuffer buffer, short version) {
-        return new UpdateMetadataResponse(ApiKeys.UPDATE_METADATA_KEY.parseResponse(version, buffer));
+        return new UpdateMetadataResponse(ApiKeys.UPDATE_METADATA.parseResponse(version, buffer));
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.UPDATE_METADATA_KEY.responseSchema(version));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        Struct struct = new Struct(ApiKeys.UPDATE_METADATA.responseSchema(version));
+        struct.set(ERROR_CODE, error.code());
         return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index cf2c9fc..96dfb2f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -29,6 +32,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
 public class WriteTxnMarkersRequest extends AbstractRequest {
     private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
     private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
@@ -36,10 +45,27 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
-    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
-    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String TOPICS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
+    private static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
+            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
+            new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."),
+            new Field(TRANSACTION_RESULT_KEY_NAME, BOOLEAN, "The result of the transaction to write to the " +
+                    "partitions (false = ABORT, true = COMMIT)."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))), "The partitions to write markers for."),
+            new Field(COORDINATOR_EPOCH_KEY_NAME, INT32, "Epoch associated with the transaction state partition " +
+                    "hosted by this transaction coordinator"));
+
+    private static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
+            new Field(TXN_MARKER_ENTRY_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "The transaction markers to be written."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{WRITE_TXN_MARKERS_REQUEST_V0};
+    }
+
     public static class TxnMarkerEntry {
         private final long producerId;
         private final short producerEpoch;
@@ -144,10 +170,10 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
 
             List<TopicPartition> partitions = new ArrayList<>();
-            Object[] topicPartitionsArray = markerStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+            Object[] topicPartitionsArray = markerStruct.getArray(TOPICS_KEY_NAME);
             for (Object topicPartitionObj : topicPartitionsArray) {
                 Struct topicPartitionStruct = (Struct) topicPartitionObj;
-                String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+                String topic = topicPartitionStruct.get(TOPIC_NAME);
                 for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
                     partitions.add(new TopicPartition(topic, (Integer) partitionObj));
                 }
@@ -181,12 +207,12 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             Object[] partitionsArray = new Object[mappedPartitions.size()];
             int j = 0;
             for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
-                Struct topicPartitionsStruct = markerStruct.instance(TOPIC_PARTITIONS_KEY_NAME);
-                topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+                Struct topicPartitionsStruct = markerStruct.instance(TOPICS_KEY_NAME);
+                topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
                 topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
                 partitionsArray[j++] = topicPartitionsStruct;
             }
-            markerStruct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+            markerStruct.set(TOPICS_KEY_NAME, partitionsArray);
             markersArray[i++] = markerStruct;
         }
         struct.set(TXN_MARKER_ENTRY_KEY_NAME, markersArray);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index ddddc42..3372670 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -26,15 +29,35 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
 public class WriteTxnMarkersResponse extends AbstractResponse {
     private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
 
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+    private static final String TOPICS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    private static final Schema WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE);
+
+    private static final Schema WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0 = new Schema(
+            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0)))),
+                    "Errors per partition from writing markers."));
+
+    private static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema(
+            new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{WRITE_TXN_MARKERS_RESPONSE_V0};
+    }
 
     // Possible error codes:
     //   CorruptRecord
@@ -66,14 +89,14 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
             long producerId = responseStruct.getLong(PRODUCER_ID_KEY_NAME);
 
             Map<TopicPartition, Errors> errorPerPartition = new HashMap<>();
-            Object[] topicPartitionsArray = responseStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+            Object[] topicPartitionsArray = responseStruct.getArray(TOPICS_KEY_NAME);
             for (Object topicPartitionObj : topicPartitionsArray) {
                 Struct topicPartitionStruct = (Struct) topicPartitionObj;
-                String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+                String topic = topicPartitionStruct.get(TOPIC_NAME);
                 for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
                     Struct partitionStruct = (Struct) partitionObj;
-                    Integer partition = partitionStruct.getInt(PARTITION_KEY_NAME);
-                    Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                    Integer partition = partitionStruct.get(PARTITION_ID);
+                    Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
                     errorPerPartition.put(new TopicPartition(topic, partition), error);
                 }
             }
@@ -98,22 +121,22 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
             Object[] partitionsArray = new Object[mappedPartitions.size()];
             int i = 0;
             for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
-                Struct topicPartitionsStruct = responseStruct.instance(TOPIC_PARTITIONS_KEY_NAME);
-                topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+                Struct topicPartitionsStruct = responseStruct.instance(TOPICS_KEY_NAME);
+                topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
                 Map<Integer, Errors> partitionIdAndErrors = topicAndPartitions.getValue();
 
                 Object[] partitionAndErrorsArray = new Object[partitionIdAndErrors.size()];
                 int j = 0;
                 for (Map.Entry<Integer, Errors> partitionAndError : partitionIdAndErrors.entrySet()) {
                     Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
-                    partitionAndErrorStruct.set(PARTITION_KEY_NAME, partitionAndError.getKey());
-                    partitionAndErrorStruct.set(ERROR_CODE_KEY_NAME, partitionAndError.getValue().code());
+                    partitionAndErrorStruct.set(PARTITION_ID, partitionAndError.getKey());
+                    partitionAndErrorStruct.set(ERROR_CODE, partitionAndError.getValue().code());
                     partitionAndErrorsArray[j++] = partitionAndErrorStruct;
                 }
                 topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
                 partitionsArray[i++] = topicPartitionsStruct;
             }
-            responseStruct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+            responseStruct.set(TOPICS_KEY_NAME, partitionsArray);
 
             responsesArray[k++] = responseStruct;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index ce6a6b0..46386cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -35,7 +35,6 @@ import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
@@ -372,7 +371,7 @@ public class SaslServerAuthenticator implements Authenticator {
                 sendKafkaResponse(requestContext, requestAndSize.request.getErrorResponse(e));
                 throw e;
             }
-            if (!Protocol.apiVersionSupported(apiKey.id, version)) {
+            if (!apiKey.isVersionSupported(version)) {
                 this.error = Errors.UNSUPPORTED_VERSION;
                 // We cannot create an error response if the request version of SaslAuthenticate is not supported
                 // This should not normally occur since clients typically check supported versions using ApiVersionsRequest
@@ -471,7 +470,7 @@ public class SaslServerAuthenticator implements Authenticator {
 
     // Visible to override for testing
     protected ApiVersionsResponse apiVersionsResponse() {
-        return ApiVersionsResponse.API_VERSIONS_RESPONSE;
+        return ApiVersionsResponse.defaultApiVersionsResponse();
     }
 
     // Visible to override for testing

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 4c3019c..86ad2ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -161,8 +161,9 @@ public class NetworkClientTest {
     }
 
     private void maybeSetExpectedApiVersionsResponse() {
-        short apiVersionsResponseVersion = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion;
-        ByteBuffer buffer = ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize(apiVersionsResponseVersion, new ResponseHeader(0));
+        ApiVersionsResponse response = ApiVersionsResponse.defaultApiVersionsResponse();
+        short apiVersionsResponseVersion = response.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion;
+        ByteBuffer buffer = response.serialize(apiVersionsResponseVersion, new ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 41ac42a..6b1d92a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -138,7 +138,7 @@ public class NodeApiVersionsTest {
     @Test
     public void testUsableVersionLatestVersions() {
         List<ApiVersion> versionList = new LinkedList<>();
-        for (ApiVersion apiVersion: ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions()) {
+        for (ApiVersion apiVersion: ApiVersionsResponse.defaultApiVersionsResponse().apiVersions()) {
             versionList.add(apiVersion);
         }
         // Add an API key that we don't know about.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index d68e676..a1dd775 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -16,16 +16,16 @@
  */
 package org.apache.kafka.common.protocol;
 
+import org.apache.kafka.common.protocol.types.BoundField;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.junit.Test;
+
 import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.junit.Test;
-
 public class ApiKeysTest {
 
     @Test(expected = IllegalArgumentException.class)
@@ -40,7 +40,7 @@ public class ApiKeysTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void schemaVersionOutOfRange() {
-        ApiKeys.PRODUCE.requestSchema((short) Protocol.REQUESTS[ApiKeys.PRODUCE.id].length);
+        ApiKeys.PRODUCE.requestSchema((short) ApiKeys.PRODUCE.requestSchemas.length);
     }
 
     /**
@@ -56,10 +56,9 @@ public class ApiKeysTest {
     @Test
     public void testResponseThrottleTime() {
         List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
-
         for (ApiKeys apiKey: ApiKeys.values()) {
             Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());
-            Field throttleTimeField = responseSchema.get("throttle_time_ms");
+            BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name);
             if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
                 assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField);
             else

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
index 8cb6b80..c4b3fc4 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -16,20 +16,21 @@
  */
 package org.apache.kafka.common.protocol;
 
-import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class ProtoUtilsTest {
     @Test
     public void testDelayedAllocationSchemaDetection() throws Exception {
         //verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.
         for (ApiKeys key : ApiKeys.values()) {
             if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP || key == ApiKeys.SASL_AUTHENTICATE) {
-                Assert.assertTrue(Protocol.requiresDelayedDeallocation(key.id));
+                assertTrue(key + " should require delayed allocation", key.requiresDelayedAllocation);
             } else {
-                Assert.assertFalse(Protocol.requiresDelayedDeallocation(key.id));
+                assertFalse(key + " should not require delayed allocation", key.requiresDelayedAllocation);
             }
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 136b55a..6e9341a 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -16,17 +16,17 @@
  */
 package org.apache.kafka.common.protocol.types;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 public class ProtocolSerializationTest {
 
@@ -96,15 +96,15 @@ public class ProtocolSerializationTest {
 
     @Test
     public void testNulls() {
-        for (Field f : this.schema.fields()) {
+        for (BoundField f : this.schema.fields()) {
             Object o = this.struct.get(f);
             try {
                 this.struct.set(f, null);
                 this.struct.validate();
-                if (!f.type.isNullable())
+                if (!f.def.type.isNullable())
                     fail("Should not allow serialization of null value.");
             } catch (SchemaException e) {
-                assertFalse(f.type.isNullable());
+                assertFalse(f.def.type.isNullable());
             } finally {
                 this.struct.set(f, o);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index 1e8e3b4..6d488cd 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -40,7 +40,7 @@ public class ApiVersionsResponseTest {
 
     @Test
     public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() throws Exception {
-        assertEquals(apiKeysInResponse(ApiVersionsResponse.API_VERSIONS_RESPONSE), Utils.mkSet(ApiKeys.values()));
+        assertEquals(apiKeysInResponse(ApiVersionsResponse.defaultApiVersionsResponse()), Utils.mkSet(ApiKeys.values()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
index bc3bd37..f73ee2f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
@@ -33,13 +33,13 @@ public class RequestHeaderTest {
 
         int correlationId = 2342;
         ByteBuffer rawBuffer = ByteBuffer.allocate(32);
-        rawBuffer.putShort(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
+        rawBuffer.putShort(ApiKeys.CONTROLLED_SHUTDOWN.id);
         rawBuffer.putShort((short) 0);
         rawBuffer.putInt(correlationId);
         rawBuffer.flip();
 
         RequestHeader deserialized = RequestHeader.parse(rawBuffer);
-        assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY, deserialized.apiKey());
+        assertEquals(ApiKeys.CONTROLLED_SHUTDOWN, deserialized.apiKey());
         assertEquals(0, deserialized.apiVersion());
         assertEquals(correlationId, deserialized.correlationId());
         assertEquals("", deserialized.clientId());
@@ -47,7 +47,7 @@ public class RequestHeaderTest {
         Struct serialized = deserialized.toStruct();
         ByteBuffer serializedBuffer = toBuffer(serialized);
 
-        assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, serializedBuffer.getShort(0));
+        assertEquals(ApiKeys.CONTROLLED_SHUTDOWN.id, serializedBuffer.getShort(0));
         assertEquals(0, serializedBuffer.getShort(2));
         assertEquals(correlationId, serializedBuffer.getInt(4));
         assertEquals(8, serializedBuffer.limit());


Mime
View raw message