kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [3/5] kafka git commit: KAFKA-4507; Clients should support older brokers (KIP-97)
Date Wed, 11 Jan 2017 19:31:03 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 1dfad1e..361fd15 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -13,12 +13,14 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -30,8 +32,6 @@ import java.util.Map;
  * This wrapper supports both v0 and v1 of OffsetCommitRequest.
  */
 public class OffsetCommitRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
     private static final String MEMBER_ID_KEY_NAME = "member_id";
@@ -83,6 +83,75 @@ public class OffsetCommitRequest extends AbstractRequest {
         public PartitionData(long offset, String metadata) {
             this(offset, DEFAULT_TIMESTAMP, metadata);
         }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(timestamp=").append(timestamp).
+                append(", offset=").append(offset).
+                append(", metadata=").append(metadata).
+                append(")");
+            return bld.toString();
+        }
+    }
+
+    public static class Builder extends AbstractRequest.Builder<OffsetCommitRequest> {
+        private final String groupId;
+        private final Map<TopicPartition, PartitionData> offsetData;
+        private String memberId = DEFAULT_MEMBER_ID;
+        private int generationId = DEFAULT_GENERATION_ID;
+        private long retentionTime = DEFAULT_RETENTION_TIME;
+
+        public Builder(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+            super(ApiKeys.OFFSET_COMMIT);
+            this.groupId = groupId;
+            this.offsetData = offsetData;
+        }
+
+        public Builder setMemberId(String memberId) {
+            this.memberId = memberId;
+            return this;
+        }
+
+        public Builder setGenerationId(int generationId) {
+            this.generationId = generationId;
+            return this;
+        }
+
+        public Builder setRetentionTime(long retentionTime) {
+            this.retentionTime = retentionTime;
+            return this;
+        }
+
+        @Override
+        public OffsetCommitRequest build() {
+            short version = version();
+            switch (version) {
+                case 0:
+                    return new OffsetCommitRequest(groupId, offsetData);
+                case 1:
+                    return new OffsetCommitRequest(groupId, generationId, memberId,
+                            offsetData);
+                case 2:
+                    return new OffsetCommitRequest(groupId, generationId, memberId,
+                            retentionTime, offsetData, version);
+                default:
+                    throw new UnsupportedVersionException("Unsupported version " + version);
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=OffsetCommitRequest").
+                append(", groupId=").append(groupId).
+                append(", memberId=").append(memberId).
+                append(", generationId=").append(generationId).
+                append(", retentionTime=").append(retentionTime).
+                append(", offsetData=").append(Utils.mkString(offsetData)).
+                append(")");
+            return bld.toString();
+        }
     }
 
     /**
@@ -90,9 +159,8 @@ public class OffsetCommitRequest extends AbstractRequest {
      * @param groupId
      * @param offsetData
      */
-    @Deprecated
-    public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
+    private OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)), (short) 0);
 
         initCommonFields(groupId, offsetData);
         this.groupId = groupId;
@@ -109,9 +177,8 @@ public class OffsetCommitRequest extends AbstractRequest {
      * @param memberId
      * @param offsetData
      */
-    @Deprecated
-    public OffsetCommitRequest(String groupId, int generationId, String memberId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)));
+    private OffsetCommitRequest(String groupId, int generationId, String memberId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)), (short) 1);
 
         initCommonFields(groupId, offsetData);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
@@ -124,16 +191,16 @@ public class OffsetCommitRequest extends AbstractRequest {
     }
 
     /**
-     * Constructor for version 2.
+     * Constructor for version 2 and above.
      * @param groupId
      * @param generationId
      * @param memberId
      * @param retentionTime
      * @param offsetData
      */
-    public OffsetCommitRequest(String groupId, int generationId, String memberId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(CURRENT_SCHEMA));
-
+    private OffsetCommitRequest(String groupId, int generationId, String memberId, long retentionTime,
+                                Map<TopicPartition, PartitionData> offsetData, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, version)), version);
         initCommonFields(groupId, offsetData);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
         struct.set(MEMBER_ID_KEY_NAME, memberId);
@@ -172,8 +239,8 @@ public class OffsetCommitRequest extends AbstractRequest {
         struct.set(TOPICS_KEY_NAME, topicArray.toArray());
     }
 
-    public OffsetCommitRequest(Struct struct) {
-        super(struct);
+    public OffsetCommitRequest(Struct struct, short versionId) {
+        super(struct, versionId);
 
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         // This field only exists in v1.
@@ -217,14 +284,14 @@ public class OffsetCommitRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         Map<TopicPartition, Short> responseData = new HashMap<>();
         for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
             responseData.put(entry.getKey(), Errors.forException(e).code());
         }
 
+        short versionId = version();
         switch (versionId) {
-            // OffsetCommitResponseV0 == OffsetCommitResponseV1 == OffsetCommitResponseV2
             case 0:
             case 1:
             case 2:
@@ -257,10 +324,10 @@ public class OffsetCommitRequest extends AbstractRequest {
 
     public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
         Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
-        return new OffsetCommitRequest(schema.read(buffer));
+        return new OffsetCommitRequest(schema.read(buffer), (short) versionId);
     }
 
     public static OffsetCommitRequest parse(ByteBuffer buffer) {
-        return new OffsetCommitRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index ede0f27..4b08a60 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -16,9 +16,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -26,12 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-/**
- * This wrapper supports both v0 and v1 of OffsetFetchRequest.
- */
 public class OffsetFetchRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String TOPICS_KEY_NAME = "topics";
 
@@ -42,12 +37,38 @@ public class OffsetFetchRequest extends AbstractRequest {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
 
+    public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> {
+        private final String groupId;
+        private final List<TopicPartition> partitions;
+
+        public Builder(String groupId, List<TopicPartition> partitions) {
+            super(ApiKeys.OFFSET_FETCH);
+            this.groupId = groupId;
+            this.partitions = partitions;
+        }
+
+        @Override
+        public OffsetFetchRequest build() {
+            return new OffsetFetchRequest(groupId, partitions, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=OffsetFetchRequest, ").
+                    append("groupId=").append(groupId).
+                    append(", partitions=").append(Utils.join(partitions, ",")).
+                    append(")");
+            return bld.toString();
+        }
+    }
+
     private final String groupId;
     private final List<TopicPartition> partitions;
 
-    public OffsetFetchRequest(String groupId, List<TopicPartition> partitions) {
-        super(new Struct(CURRENT_SCHEMA));
-
+    // v0 and v1 have the same fields.
+    private OffsetFetchRequest(String groupId, List<TopicPartition> partitions, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_FETCH.id, version)), version);
         Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
 
         struct.set(GROUP_ID_KEY_NAME, groupId);
@@ -69,8 +90,8 @@ public class OffsetFetchRequest extends AbstractRequest {
         this.partitions = partitions;
     }
 
-    public OffsetFetchRequest(Struct struct) {
-        super(struct);
+    public OffsetFetchRequest(Struct struct, short versionId) {
+        super(struct, versionId);
         partitions = new ArrayList<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
@@ -85,7 +106,7 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
 
         for (TopicPartition partition: partitions) {
@@ -94,8 +115,8 @@ public class OffsetFetchRequest extends AbstractRequest {
                     Errors.forException(e).code()));
         }
 
+        short versionId = version();
         switch (versionId) {
-            // OffsetFetchResponseV0 == OffsetFetchResponseV1
             case 0:
             case 1:
                 return new OffsetFetchResponse(responseData);
@@ -114,10 +135,11 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     public static OffsetFetchRequest parse(ByteBuffer buffer, int versionId) {
-        return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer));
+        return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static OffsetFetchRequest parse(ByteBuffer buffer) {
-        return new OffsetFetchRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 25209ce..c05643d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -14,14 +14,15 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -30,8 +31,6 @@ import java.util.List;
 import java.util.Map;
 
 public class ProduceRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
     private static final String ACKS_KEY_NAME = "acks";
     private static final String TIMEOUT_KEY_NAME = "timeout";
     private static final String TOPIC_DATA_KEY_NAME = "topic_data";
@@ -44,12 +43,45 @@ public class ProduceRequest extends AbstractRequest {
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
+    public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
+        private final short acks;
+        private final int timeout;
+        private final Map<TopicPartition, MemoryRecords> partitionRecords;
+
+        public Builder(short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
+            super(ApiKeys.PRODUCE);
+            this.acks = acks;
+            this.timeout = timeout;
+            this.partitionRecords = partitionRecords;
+        }
+
+        @Override
+        public ProduceRequest build() {
+            short version = version();
+            if (version < 2) {
+                throw new UnsupportedVersionException("ProduceRequest versions older than 2 are not supported.");
+            }
+            return new ProduceRequest(version, acks, timeout, partitionRecords);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=ProduceRequest")
+                    .append(", acks=").append(acks)
+                    .append(", timeout=").append(timeout)
+                    .append(", partitionRecords=(").append(Utils.mkString(partitionRecords))
+                    .append("))");
+            return bld.toString();
+        }
+    }
+
     private final short acks;
     private final int timeout;
     private final Map<TopicPartition, MemoryRecords> partitionRecords;
 
-    public ProduceRequest(short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
-        super(new Struct(CURRENT_SCHEMA));
+    private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, version)), version);
         Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
         struct.set(ACKS_KEY_NAME, acks);
         struct.set(TIMEOUT_KEY_NAME, timeout);
@@ -74,8 +106,8 @@ public class ProduceRequest extends AbstractRequest {
         this.partitionRecords = partitionRecords;
     }
 
-    public ProduceRequest(Struct struct) {
-        super(struct);
+    public ProduceRequest(Struct struct, short version) {
+        super(struct, version);
         partitionRecords = new HashMap<>();
         for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
             Struct topicData = (Struct) topicDataObj;
@@ -92,7 +124,7 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         /* In case the producer doesn't actually want any response */
         if (acks == 0)
             return null;
@@ -103,6 +135,7 @@ public class ProduceRequest extends AbstractRequest {
             responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET, Record.NO_TIMESTAMP));
         }
 
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return new ProduceResponse(responseMap);
@@ -132,10 +165,10 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
-        return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer));
+        return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer), (short) versionId);
     }
 
     public static ProduceRequest parse(ByteBuffer buffer) {
-        return new ProduceRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.PRODUCE.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index f7b75b2..05b78cb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -16,7 +16,6 @@ import static org.apache.kafka.common.protocol.Protocol.REQUEST_HEADER;
 
 import java.nio.ByteBuffer;
 
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -44,10 +43,6 @@ public class RequestHeader extends AbstractRequestResponse {
         correlationId = struct.getInt(CORRELATION_ID_FIELD);
     }
 
-    public RequestHeader(short apiKey, String client, int correlation) {
-        this(apiKey, ProtoUtils.latestVersion(apiKey), client, correlation);
-    }
-
     public RequestHeader(short apiKey, short version, String client, int correlation) {
         super(new Struct(Protocol.REQUEST_HEADER));
         struct.set(API_KEY_FIELD, apiKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index 97adcb3..81bc249 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -46,13 +46,13 @@ public class SaslHandshakeRequest extends AbstractRequest {
     private final String mechanism;
 
     public SaslHandshakeRequest(String mechanism) {
-        super(new Struct(CURRENT_SCHEMA));
+        super(new Struct(CURRENT_SCHEMA), ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id));
         struct.set(MECHANISM_KEY_NAME, mechanism);
         this.mechanism = mechanism;
     }
 
-    public SaslHandshakeRequest(Struct struct) {
-        super(struct);
+    public SaslHandshakeRequest(Struct struct, short versionId) {
+        super(struct, versionId);
         mechanism = struct.getString(MECHANISM_KEY_NAME);
     }
 
@@ -61,7 +61,8 @@ public class SaslHandshakeRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 List<String> enabledMechanisms = Collections.emptyList();
@@ -73,11 +74,12 @@ public class SaslHandshakeRequest extends AbstractRequest {
     }
 
     public static SaslHandshakeRequest parse(ByteBuffer buffer, int versionId) {
-        return new SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, versionId, buffer));
+        return new SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static SaslHandshakeRequest parse(ByteBuffer buffer) {
-        return new SaslHandshakeRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index e3e2507..d687d99 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -17,8 +17,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,23 +29,55 @@ import java.util.Map;
 import java.util.Set;
 
 public class StopReplicaRequest extends AbstractRequest {
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.STOP_REPLICA.id);
-
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
     private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
     private static final String DELETE_PARTITIONS_KEY_NAME = "delete_partitions";
     private static final String PARTITIONS_KEY_NAME = "partitions";
-
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITION_KEY_NAME = "partition";
 
+    public static class Builder extends AbstractRequest.Builder<StopReplicaRequest> {
+        private final int controllerId;
+        private final int controllerEpoch;
+        private final boolean deletePartitions;
+        private final Set<TopicPartition> partitions;
+
+        public Builder(int controllerId, int controllerEpoch, boolean deletePartitions,
+                       Set<TopicPartition> partitions) {
+            super(ApiKeys.STOP_REPLICA);
+            this.controllerId = controllerId;
+            this.controllerEpoch = controllerEpoch;
+            this.deletePartitions = deletePartitions;
+            this.partitions = partitions;
+        }
+
+        @Override
+        public StopReplicaRequest build() {
+            return new StopReplicaRequest(controllerId, controllerEpoch,
+                    deletePartitions, partitions, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=StopReplicaRequest").
+                append(", controllerId=").append(controllerId).
+                append(", controllerEpoch=").append(controllerEpoch).
+                append(", deletePartitions=").append(deletePartitions).
+                append(", partitions=").append(Utils.join(partitions, ",")).
+                append(")");
+            return bld.toString();
+        }
+    }
+
     private final int controllerId;
     private final int controllerEpoch;
     private final boolean deletePartitions;
     private final Set<TopicPartition> partitions;
 
-    public StopReplicaRequest(int controllerId, int controllerEpoch, boolean deletePartitions, Set<TopicPartition> partitions) {
-        super(new Struct(CURRENT_SCHEMA));
+    private StopReplicaRequest(int controllerId, int controllerEpoch, boolean deletePartitions,
+                               Set<TopicPartition> partitions, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.STOP_REPLICA.id, version)), version);
 
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
         struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
@@ -67,8 +99,8 @@ public class StopReplicaRequest extends AbstractRequest {
         this.partitions = partitions;
     }
 
-    public StopReplicaRequest(Struct struct) {
-        super(struct);
+    public StopReplicaRequest(Struct struct, short versionId) {
+        super(struct, versionId);
 
         partitions = new HashSet<>();
         for (Object partitionDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
@@ -84,12 +116,13 @@ public class StopReplicaRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         Map<TopicPartition, Short> responses = new HashMap<>(partitions.size());
         for (TopicPartition partition : partitions) {
             responses.put(partition, Errors.forException(e).code());
         }
 
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return new StopReplicaResponse(Errors.NONE.code(), responses);
@@ -116,10 +149,11 @@ public class StopReplicaRequest extends AbstractRequest {
     }
 
     public static StopReplicaRequest parse(ByteBuffer buffer, int versionId) {
-        return new StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer));
+        return new StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static StopReplicaRequest parse(ByteBuffer buffer) {
-        return new StopReplicaRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index efb484c..55b7308 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,24 +29,53 @@ import java.util.List;
 import java.util.Map;
 
 public class SyncGroupRequest extends AbstractRequest {
-
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.SYNC_GROUP.id);
     public static final String GROUP_ID_KEY_NAME = "group_id";
     public static final String GENERATION_ID_KEY_NAME = "generation_id";
     public static final String MEMBER_ID_KEY_NAME = "member_id";
     public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
     public static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment";
 
+    public static class Builder extends AbstractRequest.Builder<SyncGroupRequest> {
+        private final String groupId;
+        private final int generationId;
+        private final String memberId;
+        private final Map<String, ByteBuffer> groupAssignment;
+
+        public Builder(String groupId, int generationId, String memberId,
+                       Map<String, ByteBuffer> groupAssignment) {
+            super(ApiKeys.SYNC_GROUP);
+            this.groupId = groupId;
+            this.generationId = generationId;
+            this.memberId = memberId;
+            this.groupAssignment = groupAssignment;
+        }
+
+        @Override
+        public SyncGroupRequest build() {
+            return new SyncGroupRequest(groupId, generationId, memberId, groupAssignment, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=SyncGroupRequest").
+                    append(", groupId=").append(groupId).
+                    append(", generationId=").append(generationId).
+                    append(", memberId=").append(memberId).
+                    append(", groupAssignment=").
+                    append(Utils.join(groupAssignment.keySet(), ",")).
+                    append(")");
+            return bld.toString();
+        }
+    }
     private final String groupId;
     private final int generationId;
     private final String memberId;
     private final Map<String, ByteBuffer> groupAssignment;
 
-    public SyncGroupRequest(String groupId,
-                            int generationId,
-                            String memberId,
-                            Map<String, ByteBuffer> groupAssignment) {
-        super(new Struct(CURRENT_SCHEMA));
+    private SyncGroupRequest(String groupId, int generationId, String memberId,
+                             Map<String, ByteBuffer> groupAssignment, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.SYNC_GROUP.id, version)), version);
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
         struct.set(MEMBER_ID_KEY_NAME, memberId);
@@ -66,8 +95,8 @@ public class SyncGroupRequest extends AbstractRequest {
         this.groupAssignment = groupAssignment;
     }
 
-    public SyncGroupRequest(Struct struct) {
-        super(struct);
+    public SyncGroupRequest(Struct struct, short version) {
+        super(struct, version);
         this.groupId = struct.getString(GROUP_ID_KEY_NAME);
         this.generationId = struct.getInt(GENERATION_ID_KEY_NAME);
         this.memberId = struct.getString(MEMBER_ID_KEY_NAME);
@@ -83,7 +112,8 @@ public class SyncGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return new SyncGroupResponse(
@@ -91,7 +121,7 @@ public class SyncGroupRequest extends AbstractRequest {
                         ByteBuffer.wrap(new byte[]{}));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.SYNC_GROUP.id)));
         }
     }
 
@@ -112,7 +142,11 @@ public class SyncGroupRequest extends AbstractRequest {
     }
 
     public static SyncGroupRequest parse(ByteBuffer buffer, int versionId) {
-        return new SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, buffer));
+        return new SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, buffer),
+                (short) versionId);
     }
 
+    public static SyncGroupRequest parse(ByteBuffer buffer) {
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.SYNC_GROUP.id));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 2e5ffb2..5fd682c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -13,18 +13,17 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -32,6 +31,49 @@ import java.util.Map;
 import java.util.Set;
 
 public class UpdateMetadataRequest extends AbstractRequest {
+    public static class Builder extends AbstractRequest.Builder<UpdateMetadataRequest> {
+        private final int controllerId;
+        private final int controllerEpoch;
+        private final Map<TopicPartition, PartitionState> partitionStates;
+        private final Set<Broker> liveBrokers;
+
+        public Builder(int controllerId, int controllerEpoch,
+                       Map<TopicPartition, PartitionState> partitionStates,
+                       Set<Broker> liveBrokers) {
+            super(ApiKeys.UPDATE_METADATA_KEY);
+            this.controllerId = controllerId;
+            this.controllerEpoch = controllerEpoch;
+            this.partitionStates = partitionStates;
+            this.liveBrokers = liveBrokers;
+        }
+
+        @Override
+        public UpdateMetadataRequest build() {
+            short version = version();
+            if (version == 0) {
+                for (Broker broker : liveBrokers) {
+                    if ((broker.endPoints.get(SecurityProtocol.PLAINTEXT) == null)
+                            || (broker.endPoints.size() != 1)) {
+                        throw new UnsupportedVersionException("UpdateMetadataRequest v0 only " +
+                                "handles PLAINTEXT endpoints");
+                    }
+                }
+            }
+            return new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates, liveBrokers);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type: UpdateMetadataRequest=").
+                append(", controllerId=").append(controllerId).
+                append(", controllerEpoch=").append(controllerEpoch).
+                append(", partitionStates=").append(Utils.mkString(partitionStates)).
+                append(", liveBrokers=").append(Utils.join(liveBrokers, " ,")).
+                append(")");
+            return bld.toString();
+        }
+    }
 
     public static final class Broker {
         public final int id;
@@ -48,6 +90,16 @@ public class UpdateMetadataRequest extends AbstractRequest {
         public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
             this(id, endPoints, null);
         }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(id=").append(id);
+            bld.append(", endPoints=").append(Utils.mkString(endPoints));
+            bld.append(", rack=").append(rack);
+            bld.append(")");
+            return bld.toString();
+        }
     }
 
     public static final class EndPoint {
@@ -58,9 +110,12 @@ public class UpdateMetadataRequest extends AbstractRequest {
             this.host = host;
             this.port = port;
         }
-    }
 
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.UPDATE_METADATA_KEY.id);
+        @Override
+        public String toString() {
+            return "(host=" + host + ", port=" + port + ")";
+        }
+    }
 
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
     private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
@@ -91,37 +146,9 @@ public class UpdateMetadataRequest extends AbstractRequest {
     private final Map<TopicPartition, PartitionState> partitionStates;
     private final Set<Broker> liveBrokers;
 
-    /**
-     * Constructor for version 0.
-     */
-    @Deprecated
-    public UpdateMetadataRequest(int controllerId, int controllerEpoch, Set<Node> liveBrokers,
-                                 Map<TopicPartition, PartitionState> partitionStates) {
-        this(0, controllerId, controllerEpoch, partitionStates,
-             brokerEndPointsToBrokers(liveBrokers));
-    }
-
-    private static Set<Broker> brokerEndPointsToBrokers(Set<Node> brokerEndPoints) {
-        Set<Broker> brokers = new HashSet<>(brokerEndPoints.size());
-        for (Node brokerEndPoint : brokerEndPoints) {
-            Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
-                    new EndPoint(brokerEndPoint.host(), brokerEndPoint.port()));
-            brokers.add(new Broker(brokerEndPoint.id(), endPoints, null));
-        }
-        return brokers;
-    }
-
-    /**
-     * Constructor for version 2.
-     */
-    public UpdateMetadataRequest(int controllerId, int controllerEpoch, Map<TopicPartition,
+    private UpdateMetadataRequest(short version, int controllerId, int controllerEpoch, Map<TopicPartition,
             PartitionState> partitionStates, Set<Broker> liveBrokers) {
-        this(2, controllerId, controllerEpoch, partitionStates, liveBrokers);
-    }
-
-    public UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
-            PartitionState> partitionStates, Set<Broker> liveBrokers) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)));
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)), version);
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
         struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
 
@@ -177,9 +204,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
         this.liveBrokers = liveBrokers;
     }
 
-    public UpdateMetadataRequest(Struct struct) {
-        super(struct);
-
+    public UpdateMetadataRequest(Struct struct, short versionId) {
+        super(struct, versionId);
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) {
             Struct partitionStateData = (Struct) partitionStateDataObj;
@@ -242,7 +268,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         if (versionId <= 2)
             return new UpdateMetadataResponse(Errors.forException(e).code());
         else
@@ -267,10 +294,11 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     public static UpdateMetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new UpdateMetadataRequest(ProtoUtils.parseRequest(ApiKeys.UPDATE_METADATA_KEY.id, versionId, buffer));
+        return new UpdateMetadataRequest(ProtoUtils.parseRequest(ApiKeys.UPDATE_METADATA_KEY.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static UpdateMetadataRequest parse(ByteBuffer buffer) {
-        return new UpdateMetadataRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index c6d9a2f..59eee83 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -156,8 +156,9 @@ public class SaslClientAuthenticator implements Authenticator {
                 // API_VERSIONS_REQUEST must be sent prior to sending SASL_HANDSHAKE_REQUEST to
                 // fetch supported versions.
                 String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
-                currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++);
                 SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);
+                currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id,
+                        handshakeRequest.version(), clientId, correlationId++);
                 send(handshakeRequest.toSend(node, currentRequestHeader));
                 setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
                 break;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a502b32..ac8d078 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -234,6 +234,10 @@ public class Utils {
         return min;
     }
 
+    public static short min(short first, short second) {
+        return (short) Math.min(first, second);
+    }
+
     /**
      * Get the length for UTF8-encoding a string without encoding it first
      *
@@ -456,6 +460,24 @@ public class Utils {
         return sb.toString();
     }
 
+    public static <K, V> String mkString(Map<K, V> map) {
+        return mkString(map, "{", "}", "=", " ,");
+    }
+
+    public static <K, V> String mkString(Map<K, V> map, String begin, String end,
+                                         String keyValueSeparator, String elementSeperator) {
+        StringBuilder bld = new StringBuilder();
+        bld.append(begin);
+        String prefix = "";
+        for (Map.Entry<K, V> entry : map.entrySet()) {
+            bld.append(prefix).append(entry.getKey()).
+                    append(keyValueSeparator).append(entry.getValue());
+            prefix = elementSeperator;
+        }
+        bld.append(end);
+        return bld.toString();
+    }
+
     /**
      * Read a properties file from the given path
      * @param filename The path of the file to read

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index ee4b7cc..f216b85 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -18,10 +18,8 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Time;
 
 import java.util.ArrayDeque;
@@ -129,8 +127,8 @@ public class MockClient implements KafkaClient {
         while (iter.hasNext()) {
             ClientRequest request = iter.next();
             if (request.destination().equals(node)) {
-                responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
-                        request.createdTimeMs(), now, true, null));
+                responses.add(new ClientResponse(request.makeHeader(), request.callback(), request.destination(),
+                        request.createdTimeMs(), now, true, null, null));
                 iter.remove();
             }
         }
@@ -145,11 +143,12 @@ public class MockClient implements KafkaClient {
             if (futureResp.node != null && !request.destination().equals(futureResp.node.idString()))
                 continue;
 
-            if (!futureResp.requestMatcher.matches(request.body()))
+            AbstractRequest abstractRequest = request.requestBuilder().build();
+            if (!futureResp.requestMatcher.matches(abstractRequest))
                 throw new IllegalStateException("Next in line response did not match expected request");
 
-            ClientResponse resp = new ClientResponse(request.header(), request.callback(), request.destination(),
-                    request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
+            ClientResponse resp = new ClientResponse(request.makeHeader(), request.callback(), request.destination(),
+                    request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody);
             responses.add(resp);
             iterator.remove();
             return;
@@ -188,8 +187,8 @@ public class MockClient implements KafkaClient {
 
     public void respond(AbstractResponse response, boolean disconnected) {
         ClientRequest request = requests.remove();
-        responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
-                request.createdTimeMs(), time.milliseconds(), disconnected, response));
+        responses.add(new ClientResponse(request.makeHeader(), request.callback(), request.destination(),
+                request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
     }
 
     public void respondFrom(AbstractResponse response, Node node) {
@@ -202,8 +201,8 @@ public class MockClient implements KafkaClient {
             ClientRequest request = iterator.next();
             if (request.destination().equals(node.idString())) {
                 iterator.remove();
-                responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
-                        request.createdTimeMs(), time.milliseconds(), disconnected, response));
+                responses.add(new ClientResponse(request.makeHeader(), request.callback(), request.destination(),
+                        request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
                 return;
             }
         }
@@ -283,13 +282,16 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public RequestHeader nextRequestHeader(ApiKeys key) {
-        return new RequestHeader(key.id, "mock", correlation++);
+    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
+                                          boolean expectResponse) {
+        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null);
     }
 
     @Override
-    public RequestHeader nextRequestHeader(ApiKeys key, short version) {
-        return new RequestHeader(key.id, version, "mock", correlation++);
+    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
+                                          boolean expectResponse, RequestCompletionHandler callback) {
+        return new ClientRequest(nodeId, requestBuilder, 0, "mockClientId", createdTimeMs,
+                expectResponse, callback);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/clients/NetworkClientApiVersionsCheckTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientApiVersionsCheckTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientApiVersionsCheckTest.java
deleted file mode 100644
index dbe8619..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientApiVersionsCheckTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients;
-
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.Protocol;
-import org.apache.kafka.common.requests.AbstractRequestResponse;
-import org.apache.kafka.common.requests.ApiVersionsResponse;
-import org.apache.kafka.common.requests.ResponseHeader;
-import org.apache.kafka.test.DelayedReceive;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class NetworkClientApiVersionsCheckTest extends NetworkClientTest {
-    
-    private static final List<ApiVersionsResponse.ApiVersion> EXPECTED_API_VERSIONS = Collections.singletonList(
-            new ApiVersionsResponse.ApiVersion(ApiKeys.METADATA.id, Protocol.MIN_VERSIONS[ApiKeys.METADATA.id],
-                    Protocol.CURR_VERSION[ApiKeys.METADATA.id]));
-
-    @Override
-    protected List<ApiVersionsResponse.ApiVersion> expectedApiVersions() {
-        return EXPECTED_API_VERSIONS;
-    }
-
-    @Test
-    public void testUnsupportedLesserApiVersions() {
-        unsupportedApiVersionsCheck(expectedApiVersions(), (short) (ProtoUtils.latestVersion(ApiKeys.METADATA.id) + 1), Short.MAX_VALUE, "Node 0 does not support required versions for Api " + ApiKeys.METADATA.id);
-    }
-
-    @Test
-    public void testUnsupportedGreaterApiVersions() {
-        unsupportedApiVersionsCheck(expectedApiVersions(), Short.MIN_VALUE, (short) (ProtoUtils.oldestVersion(ApiKeys.METADATA.id) - 1), "Node 0 does not support required versions for Api " + ApiKeys.METADATA.id);
-    }
-
-    @Test
-    public void testUnsupportedMissingApiVersions() {
-        unsupportedApiVersionsCheck(Collections.<ApiVersionsResponse.ApiVersion>emptyList(), Short.MIN_VALUE, Short.MAX_VALUE, "Node 0 does not support Api " + ApiKeys.METADATA.id);
-    }
-
-    private void unsupportedApiVersionsCheck(final List<ApiVersionsResponse.ApiVersion> expectedApiVersions,
-                                             short minVersion, short maxVersion, String errorMessage) {
-        ResponseHeader responseHeader = new ResponseHeader(0);
-        List<ApiVersionsResponse.ApiVersion> apiVersions = new ArrayList<>();
-        for (ApiVersionsResponse.ApiVersion apiVersion : expectedApiVersions)
-            apiVersions.add(new ApiVersionsResponse.ApiVersion(apiVersion.apiKey, minVersion, maxVersion));
-        ApiVersionsResponse response = new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
-        ByteBuffer buffer = AbstractRequestResponse.serialize(responseHeader, response);
-
-        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
-        try {
-            long deadline = time.milliseconds() + TestUtils.DEFAULT_MAX_WAIT_MS;
-            while (time.milliseconds() < deadline && !client.ready(node, time.milliseconds()))
-                client.poll(1, time.milliseconds());
-
-            fail("KafkaException should have been thrown for " + expectedApiVersions + ", minVersion: " + minVersion +
-                    ", maxVersion: " + maxVersion);
-        } catch (KafkaException kex) {
-            assertTrue("Exception containing `" + errorMessage + "` should have been thrown due to ApiVersions " +
-                    "check, but exception message was: " + kex.getMessage(), kex.getMessage().contains(errorMessage));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 9e73901..deaf2cc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -29,7 +28,6 @@ import org.apache.kafka.common.requests.AbstractRequestResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.DelayedReceive;
@@ -40,7 +38,6 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -62,29 +59,21 @@ public class NetworkClientTest {
 
     private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
 
-    private NetworkClient createNetworkClient() {
-        final Collection<ApiVersionsResponse.ApiVersion> expectedApiVersions = expectedApiVersions();
-        if (expectedApiVersions == null)
-            return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
-                    64 * 1024, 64 * 1024, requestTimeoutMs, time);
-        else
-            return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
-                    64 * 1024, 64 * 1024, requestTimeoutMs, time, expectedApiVersions);
+    private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
 
+    private NetworkClient createNetworkClient() {
+        return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
+                64 * 1024, 64 * 1024, requestTimeoutMs, time, true);
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
-        final Collection<ApiVersionsResponse.ApiVersion> expectedApiVersions = expectedApiVersions();
-        if (expectedApiVersions == null)
-            return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-                    "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time);
-        else
-            return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-                    "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time, expectedApiVersions);
+        return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
+                "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time, true);
     }
 
-    protected List<ApiVersionsResponse.ApiVersion> expectedApiVersions() {
-        return null;
+    private NetworkClient createNetworkClientWithNoVersionDiscovery() {
+        return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
+                64 * 1024, 64 * 1024, requestTimeoutMs, time, false);
     }
 
     @Before
@@ -94,10 +83,11 @@ public class NetworkClientTest {
 
     @Test(expected = IllegalStateException.class)
     public void testSendToUnreadyNode() {
-        MetadataRequest metadataRequest = new MetadataRequest(Arrays.asList("test"));
-        RequestHeader header = client.nextRequestHeader(ApiKeys.METADATA);
-        ClientRequest request = new ClientRequest("5", time.milliseconds(), false, header, metadataRequest, null);
-        client.send(request, time.milliseconds());
+        MetadataRequest.Builder builder =
+                new MetadataRequest.Builder(Arrays.asList("test"));
+        long now = time.milliseconds();
+        ClientRequest request = client.newClientRequest("5", builder, now, false);
+        client.send(request, now);
         client.poll(1, time.milliseconds());
     }
 
@@ -112,17 +102,22 @@ public class NetworkClientTest {
     }
 
     @Test
+    public void testSimpleRequestResponseWithNoBrokerDiscovery() {
+        checkSimpleRequestResponse(clientWithNoVersionDiscovery);
+    }
+
+    @Test
     public void testClose() {
         client.ready(node, time.milliseconds());
         awaitReady(client, node);
         client.poll(1, time.milliseconds());
         assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
 
-        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
-        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
-        ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, null);
+        ProduceRequest.Builder builder = new ProduceRequest.Builder((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
+        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
         client.send(request, time.milliseconds());
-        assertEquals("There should be 1 in-flight request after send", 1, client.inFlightRequestCount(node.idString()));
+        assertEquals("There should be 1 in-flight request after send", 1,
+                client.inFlightRequestCount(node.idString()));
 
         client.close(node.idString());
         assertEquals("There should be no in-flight request after close", 0, client.inFlightRequestCount(node.idString()));
@@ -131,14 +126,15 @@ public class NetworkClientTest {
 
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
         awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
-        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
-        RequestHeader reqHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE);
+        ProduceRequest.Builder builder =
+                new ProduceRequest.Builder((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, handler);
+        ClientRequest request = networkClient.newClientRequest(
+                node.idString(), builder, time.milliseconds(), true, handler);
         networkClient.send(request, time.milliseconds());
         networkClient.poll(1, time.milliseconds());
         assertEquals(1, networkClient.inFlightRequestCount());
-        ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId());
+        ResponseHeader respHeader = new ResponseHeader(request.correlationId());
         Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
         resp.set("responses", new Object[0]);
         int size = respHeader.sizeOf() + resp.sizeOf();
@@ -151,22 +147,21 @@ public class NetworkClientTest {
         assertEquals(1, responses.size());
         assertTrue("The handler should have executed.", handler.executed);
         assertTrue("Should have a response body.", handler.response.hasResponse());
-        assertEquals("Should be correlated to the original request", request.header(), handler.response.requestHeader());
+        assertEquals("Should be correlated to the original request",
+                request.correlationId(), handler.response.requestHeader().correlationId());
     }
 
     private void maybeSetExpectedApiVersionsResponse() {
-        List<ApiVersionsResponse.ApiVersion> expectedApiVersions = expectedApiVersions();
-        if (expectedApiVersions == null)
-            return;
-
         ResponseHeader responseHeader = new ResponseHeader(0);
-        ByteBuffer buffer = AbstractRequestResponse.serialize(responseHeader, new ApiVersionsResponse(Errors.NONE.code(),
-                expectedApiVersions));
+        ByteBuffer buffer = AbstractRequestResponse.serialize(responseHeader,
+                ApiVersionsResponse.API_VERSIONS_RESPONSE);
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
     }
 
     protected void awaitReady(NetworkClient client, Node node) {
-        maybeSetExpectedApiVersionsResponse();
+        if (client.discoverBrokerVersions()) {
+            maybeSetExpectedApiVersionsResponse();
+        }
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
         selector.clear();
@@ -175,11 +170,12 @@ public class NetworkClientTest {
     @Test
     public void testRequestTimeout() {
         awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
-        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
-        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
+        ProduceRequest.Builder builder =
+                new ProduceRequest.Builder((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, handler);
         long now = time.milliseconds();
+        ClientRequest request = client.newClientRequest(
+                node.idString(), builder, now, true, handler);
         client.send(request, now);
         // sleeping to make sure that the time since last send is greater than requestTimeOut
         time.sleep(3000);
@@ -246,12 +242,13 @@ public class NetworkClientTest {
         // this test ensures that the default metadata updater does not intercept a user-initiated
         // metadata request when the remote node disconnects with the request in-flight.
         awaitReady(client, node);
-        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.METADATA);
 
-        MetadataRequest metadataRequest = new MetadataRequest(Collections.<String>emptyList());
-        ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, metadataRequest, null);
-        client.send(request, time.milliseconds());
-        client.poll(requestTimeoutMs, time.milliseconds());
+        MetadataRequest.Builder builder =
+                new MetadataRequest.Builder(Collections.<String>emptyList());
+        long now = time.milliseconds();
+        ClientRequest request = client.newClientRequest(node.idString(), builder, now, true);
+        client.send(request, now);
+        client.poll(requestTimeoutMs, now);
         assertEquals(1, client.inFlightRequestCount(node.idString()));
 
         selector.close(node.idString());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
new file mode 100644
index 0000000..b39a0aa
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class NodeApiVersionsTest {
+
+    @Test
+    public void testUnsupportedVersionsToString() {
+        NodeApiVersions versions = new NodeApiVersions(
+                Collections.<ApiVersion>emptyList());
+        StringBuilder bld = new StringBuilder();
+        String prefix = "{";
+        for (ApiKeys apiKey : ApiKeys.values()) {
+            bld.append(prefix).append(apiKey.name).
+                    append("(").append(apiKey.id).append("): UNSUPPORTED");
+            prefix = ", ";
+        }
+        bld.append("}");
+        assertEquals(bld.toString(), versions.toString());
+    }
+
+    @Test
+    public void testVersionsToString() {
+        List<ApiVersion> versionList = new ArrayList<>();
+        for (ApiKeys apiKey : ApiKeys.values()) {
+            if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
+                versionList.add(new ApiVersion(apiKey.id, (short) 0, (short) 0));
+            } else {
+                versionList.add(new ApiVersion(apiKey.id,
+                        ProtoUtils.oldestVersion(apiKey.id), ProtoUtils.latestVersion(apiKey.id)));
+            }
+        }
+        NodeApiVersions versions = new NodeApiVersions(versionList);
+        StringBuilder bld = new StringBuilder();
+        String prefix = "{";
+        for (ApiKeys apiKey : ApiKeys.values()) {
+            bld.append(prefix);
+            if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
+                bld.append("ControlledShutdown(7): 0 [usable: NONE]");
+            } else {
+                bld.append(apiKey.name).append("(").
+                        append(apiKey.id).append("): ");
+                if (ProtoUtils.oldestVersion(apiKey.id) ==
+                        ProtoUtils.latestVersion(apiKey.id)) {
+                    bld.append(ProtoUtils.oldestVersion(apiKey.id));
+                } else {
+                    bld.append(ProtoUtils.oldestVersion(apiKey.id)).
+                            append(" to ").
+                            append(ProtoUtils.latestVersion(apiKey.id));
+                }
+                bld.append(" [usable: ").append(ProtoUtils.latestVersion(apiKey.id)).
+                        append("]");
+            }
+            prefix = ", ";
+        }
+        bld.append("}");
+        assertEquals(bld.toString(), versions.toString());
+    }
+
+    @Test
+    public void testUsableVersionCalculation() {
+        List<ApiVersion> versionList = new ArrayList<>();
+        versionList.add(new ApiVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, (short) 0, (short) 0));
+        versionList.add(new ApiVersion(ApiKeys.FETCH.id, (short) 1, (short) 2));
+        NodeApiVersions versions =  new NodeApiVersions(versionList);
+        try {
+            versions.usableVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY);
+            Assert.fail("expected UnsupportedVersionException");
+        } catch (UnsupportedVersionException e) {
+            // pass
+        }
+        assertEquals(2, versions.usableVersion(ApiKeys.FETCH));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 861dbf9..88f55d9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -944,7 +944,7 @@ public class KafkaConsumerTest {
 
         // the auto commit is disabled, so no offset commit request should be sent
         for (ClientRequest req: client.requests())
-            assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+            assertTrue(req.requestBuilder().apiKey() != ApiKeys.OFFSET_COMMIT);
 
         // subscription change
         consumer.unsubscribe();
@@ -955,7 +955,7 @@ public class KafkaConsumerTest {
 
         // the auto commit is disabled, so no offset commit request should be sent
         for (ClientRequest req: client.requests())
-            assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+            assertTrue(req.requestBuilder().apiKey() != ApiKeys.OFFSET_COMMIT);
 
         client.requests().clear();
         consumer.close();
@@ -1084,7 +1084,7 @@ public class KafkaConsumerTest {
 
         // the auto commit is disabled, so no offset commit request should be sent
         for (ClientRequest req : client.requests())
-            assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+            assertTrue(req.requestBuilder().apiKey() != ApiKeys.OFFSET_COMMIT);
 
         client.requests().clear();
         consumer.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 003d92d..2493ed8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -19,7 +19,6 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -49,7 +48,7 @@ public class ConsumerNetworkClientTest {
     @Test
     public void send() {
         client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
-        RequestFuture<ClientResponse> future = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
         assertEquals(1, consumerClient.pendingRequestCount());
         assertEquals(1, consumerClient.pendingRequestCount(node));
         assertFalse(future.isDone());
@@ -67,8 +66,8 @@ public class ConsumerNetworkClientTest {
     public void multiSend() {
         client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
         client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
-        RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
-        RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        RequestFuture<ClientResponse> future1 = consumerClient.send(node, heartbeat());
+        RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat());
         assertEquals(2, consumerClient.pendingRequestCount());
         assertEquals(2, consumerClient.pendingRequestCount(node));
 
@@ -143,7 +142,7 @@ public class ConsumerNetworkClientTest {
 
     @Test
     public void wakeup() {
-        RequestFuture<ClientResponse> future = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
         consumerClient.wakeup();
         try {
             consumerClient.poll(0);
@@ -181,13 +180,13 @@ public class ConsumerNetworkClientTest {
         };
         // Queue first send, sleep long enough for this to expire and then queue second send
         consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, unsentExpiryMs);
-        RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        RequestFuture<ClientResponse> future1 = consumerClient.send(node, heartbeat());
         assertEquals(1, consumerClient.pendingRequestCount());
         assertEquals(1, consumerClient.pendingRequestCount(node));
         assertFalse(future1.isDone());
 
         time.sleep(unsentExpiryMs + 1);
-        RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat());
         assertEquals(2, consumerClient.pendingRequestCount());
         assertEquals(2, consumerClient.pendingRequestCount(node));
         assertFalse(future2.isDone());
@@ -210,7 +209,7 @@ public class ConsumerNetworkClientTest {
 
         // Disable ready flag to delay send and queue another send. Disconnection should remove pending send
         isReady.set(false);
-        RequestFuture<ClientResponse> future3 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        RequestFuture<ClientResponse> future3 = consumerClient.send(node, heartbeat());
         assertEquals(1, consumerClient.pendingRequestCount());
         assertEquals(1, consumerClient.pendingRequestCount(node));
         disconnected.set(true);
@@ -221,8 +220,8 @@ public class ConsumerNetworkClientTest {
         assertEquals(0, consumerClient.pendingRequestCount(node));
     }
 
-    private HeartbeatRequest heartbeatRequest() {
-        return new HeartbeatRequest("group", 1, "memberId");
+    private HeartbeatRequest.Builder heartbeat() {
+        return new HeartbeatRequest.Builder("group", 1, "memberId");
     }
 
     private HeartbeatResponse heartbeatResponse(short error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 706caf7..7941e44 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -583,14 +583,17 @@ public class FetcherTest {
     @Test(expected = InvalidTopicException.class)
     public void testGetTopicMetadataInvalidTopic() {
         client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION));
-        fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
+        fetcher.getTopicMetadata(
+            new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
     }
 
     @Test
     public void testGetTopicMetadataUnknownTopic() {
         client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
 
-        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
+        Map<String, List<PartitionInfo>> topicMetadata =
+                fetcher.getTopicMetadata(
+                        new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
         assertNull(topicMetadata.get(topicName));
     }
 
@@ -599,7 +602,9 @@ public class FetcherTest {
         client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE));
         client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
 
-        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
+        Map<String, List<PartitionInfo>> topicMetadata =
+                fetcher.getTopicMetadata(
+                        new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
         assertTrue(topicMetadata.containsKey(topicName));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 097b4fc..00c536c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -205,7 +205,7 @@ public class SenderTest {
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
             String id = client.requests().peek().destination();
-            assertEquals(ApiKeys.PRODUCE.id, client.requests().peek().header().apiKey());
+            assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.valueOf(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));


Mime
View raw message