kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/5] kafka git commit: KAFKA-4507; Clients should support older brokers (KIP-97)
Date Wed, 11 Jan 2017 19:31:02 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5f4463f..f02133d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -43,110 +43,112 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class RequestResponseTest {
+    private static final Logger log = LoggerFactory.getLogger(RequestResponseTest.class);
 
     @Test
     public void testSerialization() throws Exception {
-        List<AbstractRequestResponse> requestResponseList = Arrays.asList(
-                createRequestHeader(),
-                createResponseHeader(),
-                createGroupCoordinatorRequest(),
-                createGroupCoordinatorRequest().getErrorResponse(0, new UnknownServerException()),
-                createGroupCoordinatorResponse(),
-                createControlledShutdownRequest(),
-                createControlledShutdownResponse(),
-                createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()),
-                createFetchRequest(3),
-                createFetchRequest(3).getErrorResponse(3, new UnknownServerException()),
-                createFetchResponse(),
-                createHeartBeatRequest(),
-                createHeartBeatRequest().getErrorResponse(0, new UnknownServerException()),
-                createHeartBeatResponse(),
-                createJoinGroupRequest(1),
-                createJoinGroupRequest(0).getErrorResponse(0, new UnknownServerException()),
-                createJoinGroupRequest(1).getErrorResponse(1, new UnknownServerException()),
-                createJoinGroupResponse(),
-                createLeaveGroupRequest(),
-                createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()),
-                createLeaveGroupResponse(),
-                createListGroupsRequest(),
-                createListGroupsRequest().getErrorResponse(0, new UnknownServerException()),
-                createListGroupsResponse(),
-                createDescribeGroupRequest(),
-                createDescribeGroupRequest().getErrorResponse(0, new UnknownServerException()),
-                createDescribeGroupResponse(),
-                createListOffsetRequest(1),
-                createListOffsetRequest(1).getErrorResponse(1, new UnknownServerException()),
-                createListOffsetResponse(1),
-                MetadataRequest.allTopics(),
-                createMetadataRequest(Arrays.asList("topic1")),
-                createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(2, new UnknownServerException()),
-                createMetadataResponse(2),
-                createOffsetCommitRequest(2),
-                createOffsetCommitRequest(2).getErrorResponse(2, new UnknownServerException()),
-                createOffsetCommitResponse(),
-                createOffsetFetchRequest(),
-                createOffsetFetchRequest().getErrorResponse(0, new UnknownServerException()),
-                createOffsetFetchResponse(),
-                createProduceRequest(),
-                createProduceRequest().getErrorResponse(2, new UnknownServerException()),
-                createProduceResponse(),
-                createStopReplicaRequest(true),
-                createStopReplicaRequest(false),
-                createStopReplicaRequest(true).getErrorResponse(0, new UnknownServerException()),
-                createStopReplicaResponse(),
-                createUpdateMetadataRequest(2, "rack1"),
-                createUpdateMetadataRequest(2, null),
-                createUpdateMetadataRequest(2, "rack1").getErrorResponse(2, new UnknownServerException()),
-                createUpdateMetadataResponse(),
-                createLeaderAndIsrRequest(),
-                createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()),
-                createLeaderAndIsrResponse(),
-                createSaslHandshakeRequest(),
-                createSaslHandshakeRequest().getErrorResponse(0, new UnknownServerException()),
-                createSaslHandshakeResponse(),
-                createApiVersionRequest(),
-                createApiVersionRequest().getErrorResponse(0, new UnknownServerException()),
-                createApiVersionResponse(),
-                createCreateTopicRequest(),
-                createCreateTopicRequest().getErrorResponse(0, new UnknownServerException()),
-                createCreateTopicResponse(),
-                createDeleteTopicsRequest(),
-                createDeleteTopicsRequest().getErrorResponse(0, new UnknownServerException()),
-                createDeleteTopicsResponse()
-        );
-
-        for (AbstractRequestResponse req : requestResponseList)
-            checkSerialization(req, null);
-
+        checkSerialization(createRequestHeader(), null);
+        checkSerialization(createResponseHeader(), null);
+        checkSerialization(createGroupCoordinatorRequest());
+        checkSerialization(createGroupCoordinatorRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createGroupCoordinatorResponse(), null);
+        checkSerialization(createControlledShutdownRequest());
+        checkSerialization(createControlledShutdownResponse(), null);
+        checkSerialization(createControlledShutdownRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createFetchRequest(3));
+        checkSerialization(createFetchRequest(3).getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createFetchResponse(), null);
+        checkSerialization(createHeartBeatRequest());
+        checkSerialization(createHeartBeatRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createHeartBeatResponse(), null);
+        checkSerialization(createJoinGroupRequest(1));
+        checkSerialization(createJoinGroupRequest(0).getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createJoinGroupRequest(1).getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createJoinGroupResponse(), null);
+        checkSerialization(createLeaveGroupRequest());
+        checkSerialization(createLeaveGroupRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createLeaveGroupResponse(), null);
+        checkSerialization(createListGroupsRequest());
+        checkSerialization(createListGroupsRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createListGroupsResponse(), null);
+        checkSerialization(createDescribeGroupRequest());
+        checkSerialization(createDescribeGroupRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createDescribeGroupResponse(), null);
+        checkSerialization(createListOffsetRequest(1));
+        checkSerialization(createListOffsetRequest(1).getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createListOffsetResponse(1), null);
+        checkSerialization(MetadataRequest.allTopics((short) 2));
+        checkSerialization(createMetadataRequest(1, Arrays.asList("topic1")));
+        checkSerialization(createMetadataRequest(1, Arrays.asList("topic1")).getErrorResponse(new UnknownServerException()), 1);
+        checkSerialization(createMetadataResponse(2), null);
+        checkSerialization(createMetadataRequest(2, Arrays.asList("topic1")).getErrorResponse(new UnknownServerException()), 2);
+        checkSerialization(createOffsetCommitRequest(2));
+        checkSerialization(createOffsetCommitRequest(2).getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createOffsetCommitResponse(), null);
+        checkSerialization(createOffsetFetchRequest());
+        checkSerialization(createOffsetFetchRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createOffsetFetchResponse(), null);
+        checkSerialization(createProduceRequest());
+        checkSerialization(createProduceRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createProduceResponse(), null);
+        checkSerialization(createStopReplicaRequest(true));
+        checkSerialization(createStopReplicaRequest(false));
+        checkSerialization(createStopReplicaRequest(true).getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createStopReplicaResponse(), null);
+        checkSerialization(createUpdateMetadataRequest(2, "rack1"));
+        checkSerialization(createUpdateMetadataRequest(2, null));
+        checkSerialization(createUpdateMetadataRequest(2, "rack1").getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createUpdateMetadataResponse(), null);
+        checkSerialization(createLeaderAndIsrRequest());
+        checkSerialization(createLeaderAndIsrRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createLeaderAndIsrResponse(), null);
+        checkSerialization(createSaslHandshakeRequest());
+        checkSerialization(createSaslHandshakeRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createSaslHandshakeResponse(), null);
+        checkSerialization(createApiVersionRequest());
+        checkSerialization(createApiVersionRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createApiVersionResponse(), null);
+        checkSerialization(createCreateTopicRequest());
+        checkSerialization(createCreateTopicRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createCreateTopicResponse(), null);
+        checkSerialization(createDeleteTopicsRequest());
+        checkSerialization(createDeleteTopicsRequest().getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createDeleteTopicsResponse(), null);
         checkOlderFetchVersions();
         checkSerialization(createMetadataResponse(0), 0);
         checkSerialization(createMetadataResponse(1), 1);
-        checkSerialization(createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(0, new UnknownServerException()), 0);
-        checkSerialization(createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(1, new UnknownServerException()), 1);
+        checkSerialization(createMetadataRequest(1, Arrays.asList("topic1")).getErrorResponse(new UnknownServerException()), 1);
         checkSerialization(createOffsetCommitRequest(0), 0);
-        checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
+        checkSerialization(createOffsetCommitRequest(0).getErrorResponse(new UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(1), 1);
-        checkSerialization(createOffsetCommitRequest(1).getErrorResponse(1, new UnknownServerException()), 1);
+        checkSerialization(createOffsetCommitRequest(1).getErrorResponse(new UnknownServerException()), 1);
         checkSerialization(createJoinGroupRequest(0), 0);
         checkSerialization(createUpdateMetadataRequest(0, null), 0);
-        checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(0, new UnknownServerException()), 0);
+        checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(new UnknownServerException()), 0);
         checkSerialization(createUpdateMetadataRequest(1, null), 1);
         checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1);
-        checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(1, new UnknownServerException()), 1);
+        checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(new UnknownServerException()), 1);
         checkSerialization(createListOffsetRequest(0), 0);
-        checkSerialization(createListOffsetRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
+        checkSerialization(createListOffsetRequest(0).getErrorResponse(new UnknownServerException()), 0);
         checkSerialization(createListOffsetResponse(0), 0);
     }
 
     private void checkOlderFetchVersions() throws Exception {
         int latestVersion = ProtoUtils.latestVersion(ApiKeys.FETCH.id);
         for (int i = 0; i < latestVersion; ++i) {
-            checkSerialization(createFetchRequest(i).getErrorResponse(i, new UnknownServerException()), i);
+            checkSerialization(createFetchRequest(i).getErrorResponse(new UnknownServerException()), i);
             checkSerialization(createFetchRequest(i), i);
         }
     }
 
+    private void checkSerialization(AbstractRequest req) throws Exception {
+        checkSerialization(req, Integer.valueOf(req.version()));
+    }
+
     private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
         req.writeTo(buffer);
@@ -202,7 +204,8 @@ public class RequestResponseTest {
     @Test
     public void verifyFetchResponseFullWrite() throws Exception {
         FetchResponse fetchResponse = createFetchResponse();
-        RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, "client", 15);
+        RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, ProtoUtils.latestVersion(ApiKeys.FETCH.id),
+                "client", 15);
 
         Send send = fetchResponse.toSend("1", header);
         ByteBufferChannel channel = new ByteBufferChannel(send.size());
@@ -260,22 +263,19 @@ public class RequestResponseTest {
     }
 
     private GroupCoordinatorRequest createGroupCoordinatorRequest() {
-        return new GroupCoordinatorRequest("test-group");
+        return new GroupCoordinatorRequest.Builder("test-group").build();
     }
 
     private GroupCoordinatorResponse createGroupCoordinatorResponse() {
         return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
     }
 
-    @SuppressWarnings("deprecation")
     private FetchRequest createFetchRequest(int version) {
         LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
         fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
         fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
-        if (version < 3)
-            return new FetchRequest(100, 100000, fetchData);
-        else
-            return new FetchRequest(100, 1000, 1000000, fetchData);
+        return new FetchRequest.Builder(100, 100000, fetchData).setMaxBytes(1000).
+                setVersion((short) version).build();
     }
 
     private FetchResponse createFetchResponse() {
@@ -286,7 +286,7 @@ public class RequestResponseTest {
     }
 
     private HeartbeatRequest createHeartBeatRequest() {
-        return new HeartbeatRequest("group1", 1, "consumer1");
+        return new HeartbeatRequest.Builder("group1", 1, "consumer1").build();
     }
 
     private HeartbeatResponse createHeartBeatResponse() {
@@ -299,9 +299,11 @@ public class RequestResponseTest {
         List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
         protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
         if (version == 0) {
-            return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
+            return new JoinGroupRequest.Builder("group1", 30000, "consumer1", "consumer", protocols).
+                    setVersion((short) version).build();
         } else {
-            return new JoinGroupRequest("group1", 10000, 60000, "consumer1", "consumer", protocols);
+            return new JoinGroupRequest.Builder("group1", 10000, "consumer1", "consumer", protocols).
+                    setRebalanceTimeout(60000).build();
         }
     }
 
@@ -313,7 +315,7 @@ public class RequestResponseTest {
     }
 
     private ListGroupsRequest createListGroupsRequest() {
-        return new ListGroupsRequest();
+        return new ListGroupsRequest.Builder().build();
     }
 
     private ListGroupsResponse createListGroupsResponse() {
@@ -322,7 +324,7 @@ public class RequestResponseTest {
     }
 
     private DescribeGroupsRequest createDescribeGroupRequest() {
-        return new DescribeGroupsRequest(Collections.singletonList("test-group"));
+        return new DescribeGroupsRequest.Builder(Collections.singletonList("test-group")).build();
     }
 
     private DescribeGroupsResponse createDescribeGroupResponse() {
@@ -337,7 +339,7 @@ public class RequestResponseTest {
     }
 
     private LeaveGroupRequest createLeaveGroupRequest() {
-        return new LeaveGroupRequest("group1", "consumer1");
+        return new LeaveGroupRequest.Builder("group1", "consumer1").build();
     }
 
     private LeaveGroupResponse createLeaveGroupResponse() {
@@ -347,13 +349,14 @@ public class RequestResponseTest {
     @SuppressWarnings("deprecation")
     private ListOffsetRequest createListOffsetRequest(int version) {
         if (version == 0) {
-            Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>();
-            offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
-            return new ListOffsetRequest(offsetData);
+            Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
+                    new TopicPartition("test", 0),
+                    new ListOffsetRequest.PartitionData(1000000L, 10));
+            return new ListOffsetRequest.Builder().setOffsetData(offsetData).setVersion((short) version).build();
         } else if (version == 1) {
-            Map<TopicPartition, Long> offsetData = new HashMap<>();
-            offsetData.put(new TopicPartition("test", 0), 1000000L);
-            return new ListOffsetRequest(offsetData, ListOffsetRequest.CONSUMER_REPLICA_ID);
+            Map<TopicPartition, Long> offsetData = Collections.singletonMap(
+                    new TopicPartition("test", 0), 1000000L);
+            return new ListOffsetRequest.Builder().setTargetTimes(offsetData).setVersion((short) version).build();
         } else {
             throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version);
         }
@@ -374,8 +377,10 @@ public class RequestResponseTest {
         }
     }
 
-    private MetadataRequest createMetadataRequest(List<String> topics) {
-        return new MetadataRequest(topics);
+    private MetadataRequest createMetadataRequest(int version, List<String> topics) {
+        return new MetadataRequest.Builder(topics).
+                setVersion((short) version).
+                build();
     }
 
     private MetadataResponse createMetadataResponse(int version) {
@@ -392,19 +397,16 @@ public class RequestResponseTest {
         return new MetadataResponse(Arrays.asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata, version);
     }
 
-    @SuppressWarnings("deprecation")
     private OffsetCommitRequest createOffsetCommitRequest(int version) {
         Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
         commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, ""));
         commitData.put(new TopicPartition("test", 1), new OffsetCommitRequest.PartitionData(200, null));
-        if (version == 0) {
-            return new OffsetCommitRequest("group1", commitData);
-        } else if (version == 1) {
-            return new OffsetCommitRequest("group1", 100, "consumer1", commitData);
-        } else if (version == 2) {
-            return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData);
-        }
-        throw new IllegalArgumentException("Unknown offset commit request version " + version);
+        return new OffsetCommitRequest.Builder("group1", commitData)
+                .setGenerationId(100)
+                .setMemberId("consumer1")
+                .setRetentionTime(1000000)
+                .setVersion((short) version)
+                .build();
     }
 
     private OffsetCommitResponse createOffsetCommitResponse() {
@@ -414,7 +416,8 @@ public class RequestResponseTest {
     }
 
     private OffsetFetchRequest createOffsetFetchRequest() {
-        return new OffsetFetchRequest("group1", Arrays.asList(new TopicPartition("test11", 1)));
+        return new OffsetFetchRequest.Builder("group1",
+                Arrays.asList(new TopicPartition("test11", 1))).build();
     }
 
     private OffsetFetchResponse createOffsetFetchResponse() {
@@ -427,7 +430,7 @@ public class RequestResponseTest {
     private ProduceRequest createProduceRequest() {
         Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
         produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(ByteBuffer.allocate(10)));
-        return new ProduceRequest((short) 1, 5000, produceData);
+        return new ProduceRequest.Builder((short) 1, 5000, produceData).build();
     }
 
     private ProduceResponse createProduceResponse() {
@@ -438,7 +441,7 @@ public class RequestResponseTest {
 
     private StopReplicaRequest createStopReplicaRequest(boolean deletePartitions) {
         Set<TopicPartition> partitions = new HashSet<>(Arrays.asList(new TopicPartition("test", 0)));
-        return new StopReplicaRequest(0, 1, deletePartitions, partitions);
+        return new StopReplicaRequest.Builder(0, 1, deletePartitions, partitions).build();
     }
 
     private StopReplicaResponse createStopReplicaResponse() {
@@ -448,7 +451,7 @@ public class RequestResponseTest {
     }
 
     private ControlledShutdownRequest createControlledShutdownRequest() {
-        return new ControlledShutdownRequest(10);
+        return new ControlledShutdownRequest.Builder(10).build();
     }
 
     private ControlledShutdownResponse createControlledShutdownResponse() {
@@ -475,7 +478,7 @@ public class RequestResponseTest {
                 new Node(1, "test1", 1223)
         ));
 
-        return new LeaderAndIsrRequest(1, 10, partitionStates, leaders);
+        return new LeaderAndIsrRequest.Builder(1, 10, partitionStates, leaders).build();
     }
 
     private LeaderAndIsrResponse createLeaderAndIsrResponse() {
@@ -484,7 +487,6 @@ public class RequestResponseTest {
         return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
     }
 
-    @SuppressWarnings("deprecation")
     private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) {
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);
@@ -496,26 +498,20 @@ public class RequestResponseTest {
         partitionStates.put(new TopicPartition("topic20", 1),
                 new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
 
-        if (version == 0) {
-            Set<Node> liveBrokers = new HashSet<>(Arrays.asList(
-                    new Node(0, "host1", 1223),
-                    new Node(1, "host2", 1234)
-            ));
-
-            return new UpdateMetadataRequest(1, 10, liveBrokers, partitionStates);
-        } else {
-            Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints1 = new HashMap<>();
-            endPoints1.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1223));
+        Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints1 = new HashMap<>();
+        endPoints1.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1223));
 
-            Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints2 = new HashMap<>();
-            endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244));
+        Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints2 = new HashMap<>();
+        endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244));
+        if (version > 0) {
             endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234));
-
-            Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack),
-                    new UpdateMetadataRequest.Broker(1, endPoints2, rack)
-            ));
-            return new UpdateMetadataRequest(version, 1, 10, partitionStates, liveBrokers);
         }
+
+        Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack),
+                new UpdateMetadataRequest.Broker(1, endPoints2, rack)
+        ));
+        return new UpdateMetadataRequest.Builder(1, 10, partitionStates, liveBrokers).
+                setVersion((short) version).build();
     }
 
     private UpdateMetadataResponse createUpdateMetadataResponse() {
@@ -531,7 +527,7 @@ public class RequestResponseTest {
     }
 
     private ApiVersionsRequest createApiVersionRequest() {
-        return new ApiVersionsRequest();
+        return new ApiVersionsRequest.Builder().build();
     }
 
     private ApiVersionsResponse createApiVersionResponse() {
@@ -554,7 +550,7 @@ public class RequestResponseTest {
         Map<String, CreateTopicsRequest.TopicDetails> request = new HashMap<>();
         request.put("my_t1", request1);
         request.put("my_t2", request2);
-        return new CreateTopicsRequest(request, 0);
+        return new CreateTopicsRequest.Builder(request, 0).build();
     }
 
     private CreateTopicsResponse createCreateTopicResponse() {
@@ -565,7 +561,8 @@ public class RequestResponseTest {
     }
 
     private DeleteTopicsRequest createDeleteTopicsRequest() {
-        return new DeleteTopicsRequest(new HashSet<>(Arrays.asList("my_t1", "my_t2")), 10000);
+        return new DeleteTopicsRequest.Builder(new HashSet<>(Arrays.asList("my_t1", "my_t2")), 10000).
+                build();
     }
 
     private DeleteTopicsResponse createDeleteTopicsResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index f8b57f4..b515ef4 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.Protocol;
+import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -390,7 +390,7 @@ public class SaslAuthenticatorTest {
         String node = "1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node);
         RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, Short.MAX_VALUE, "someclient", 1);
-        ApiVersionsRequest request = new ApiVersionsRequest();
+        ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
         selector.send(request.toSend(node, header));
         ByteBuffer responseBuffer = waitForResponse();
         ResponseHeader.parse(responseBuffer);
@@ -486,8 +486,9 @@ public class SaslAuthenticatorTest {
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
         sendHandshakeRequestReceiveResponse(node1);
 
-        ApiVersionsRequest request = new ApiVersionsRequest();
-        RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, "someclient", 2);
+        ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
+        RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id,
+                request.version(), "someclient", 2);
         selector.send(request.toSend(node1, versionsHeader));
         NetworkTestUtils.waitForChannelClose(selector, node1);
         selector.close();
@@ -550,8 +551,10 @@ public class SaslAuthenticatorTest {
         // Send metadata request before Kafka SASL handshake request
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 1);
-        MetadataRequest metadataRequest1 = new MetadataRequest(Collections.singletonList("sometopic"));
+        MetadataRequest metadataRequest1 =
+                new MetadataRequest.Builder(Collections.singletonList("sometopic")).build();
+        RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id,
+                metadataRequest1.version(), "someclient", 1);
         selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
         NetworkTestUtils.waitForChannelClose(selector, node1);
         selector.close();
@@ -563,8 +566,10 @@ public class SaslAuthenticatorTest {
         String node2 = "invalid2";
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
         sendHandshakeRequestReceiveResponse(node2);
-        RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 2);
-        MetadataRequest metadataRequest2 = new MetadataRequest(Collections.singletonList("sometopic"));
+        MetadataRequest metadataRequest2 =
+                new MetadataRequest.Builder(Collections.singletonList("sometopic")).build();
+        RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id,
+                metadataRequest2.version(), "someclient", 2);
         selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
         NetworkTestUtils.waitForChannelClose(selector, node2);
         selector.close();
@@ -702,8 +707,8 @@ public class SaslAuthenticatorTest {
 
         // Send ApiVersionsRequest and check response
         ApiVersionsResponse versionsResponse = sendVersionRequestReceiveResponse(node);
-        assertEquals(Protocol.MIN_VERSIONS[ApiKeys.SASL_HANDSHAKE.id], versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion);
-        assertEquals(Protocol.CURR_VERSION[ApiKeys.SASL_HANDSHAKE.id], versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);
+        assertEquals(ProtoUtils.oldestVersion(ApiKeys.SASL_HANDSHAKE.id), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion);
+        assertEquals(ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);
 
         // Send SaslHandshakeRequest and check response
         SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node);
@@ -756,7 +761,8 @@ public class SaslAuthenticatorTest {
     }
 
     private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException {
-        RequestHeader header = new RequestHeader(apiKey.id, "someclient", 1);
+        RequestHeader header =
+                new RequestHeader(apiKey.id, request.version(), "someclient", 1);
         Send send = request.toSend(node, header);
         selector.send(send);
         ByteBuffer responseBuffer = waitForResponse();
@@ -771,7 +777,7 @@ public class SaslAuthenticatorTest {
     }
 
     private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception {
-        ApiVersionsRequest handshakeRequest = new ApiVersionsRequest();
+        ApiVersionsRequest handshakeRequest = new ApiVersionsRequest.Builder().build();
         ApiVersionsResponse response =  (ApiVersionsResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest);
         assertEquals(Errors.NONE.code(), response.errorCode());
         return response;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index d6a134f..5ee3362 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,7 +65,6 @@ public class WorkerGroupMember {
             ApiKeys.JOIN_GROUP,
             ApiKeys.LEAVE_GROUP,
             ApiKeys.SYNC_GROUP);
-    private static final Collection<ApiVersionsResponse.ApiVersion> EXPECTED_API_VERSIONS = ClientUtils.buildExpectedApiVersions(WORKER_GROUP_MEMBER_APIS);
 
     private final Time time;
     private final String clientId;
@@ -113,7 +110,7 @@ public class WorkerGroupMember {
                     config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
                     time,
-                    EXPECTED_API_VERSIONS);
+                    true);
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
             this.coordinator = new WorkerCoordinator(this.client,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 3845d8d..ab29c88 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -39,10 +39,10 @@ class AdminClient(val time: Time,
 
   private def send(target: Node,
                    api: ApiKeys,
-                   request: AbstractRequest): AbstractResponse = {
+                   request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
     var future: RequestFuture[ClientResponse] = null
 
-    future = client.send(target, api, request)
+    future = client.send(target, request)
     client.poll(future)
 
     if (future.succeeded())
@@ -51,7 +51,7 @@ class AdminClient(val time: Time,
       throw future.exception()
   }
 
-  private def sendAnyNode(api: ApiKeys, request: AbstractRequest): AbstractResponse = {
+  private def sendAnyNode(api: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
     bootstrapBrokers.foreach { broker =>
       try {
         return send(broker, api, request)
@@ -64,20 +64,20 @@ class AdminClient(val time: Time,
   }
 
   private def findCoordinator(groupId: String): Node = {
-    val request = new GroupCoordinatorRequest(groupId)
-    val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request).asInstanceOf[GroupCoordinatorResponse]
+    val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
+    val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
     Errors.forCode(response.errorCode()).maybeThrow()
     response.node()
   }
 
   def listGroups(node: Node): List[GroupOverview] = {
-    val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest()).asInstanceOf[ListGroupsResponse]
+    val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest.Builder()).asInstanceOf[ListGroupsResponse]
     Errors.forCode(response.errorCode()).maybeThrow()
     response.groups().asScala.map(group => GroupOverview(group.groupId(), group.protocolType())).toList
   }
 
   private def findAllBrokers(): List[Node] = {
-    val request = new MetadataRequest(Collections.emptyList[String])
+    val request = MetadataRequest.Builder.allTopics()
     val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
     val errors = response.errors()
     if (!errors.isEmpty)
@@ -132,7 +132,8 @@ class AdminClient(val time: Time,
 
   def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
     val coordinator = findCoordinator(groupId)
-    val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(Collections.singletonList(groupId)))
+    val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS,
+        new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)))
     val response = responseBody.asInstanceOf[DescribeGroupsResponse]
     val metadata = response.groups.get(groupId)
     if (metadata == null)
@@ -225,7 +226,8 @@ object AdminClient {
       DefaultSendBufferBytes,
       DefaultReceiveBufferBytes,
       DefaultRequestTimeoutMs,
-      time)
+      time,
+      true)
 
     val highLevelClient = new ConsumerNetworkClient(
       networkClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 96b0e46..e9ccf64 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -26,10 +26,11 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, LoginType, NetworkReceive, Selectable, Selector}
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, SecurityProtocol}
 import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
+import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, TopicPartition}
 
@@ -56,12 +57,13 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     }
   }
 
-  def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit = null) {
+  def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
+                  callback: AbstractResponse => Unit = null) {
     brokerLock synchronized {
       val stateInfoOpt = brokerStateInfo.get(brokerId)
       stateInfoOpt match {
         case Some(stateInfo) =>
-          stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))
+          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
         case None =>
           warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
       }
@@ -116,7 +118,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         config.requestTimeoutMs,
-        time
+        time,
+        false
       )
     }
     val threadName = threadNamePrefix match {
@@ -148,7 +151,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
   }
 }
 
-case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit)
+case class QueueItem(apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
+                     callback: AbstractResponse => Unit)
 
 class RequestSendThread(val controllerId: Int,
                         val controllerContext: ControllerContext,
@@ -168,7 +172,7 @@ class RequestSendThread(val controllerId: Int,
 
     def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))
 
-    val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
+    val QueueItem(apiKey, requestBuilder, callback) = queue.take()
     import NetworkClientBlockingOps._
     var clientResponse: ClientResponse = null
     try {
@@ -183,8 +187,8 @@ class RequestSendThread(val controllerId: Int,
               backoff()
             }
             else {
-              val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
-              val clientRequest = new ClientRequest(brokerNode.idString, time.milliseconds(), true, requestHeader, request, null)
+              val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
+                time.milliseconds(), true)
               clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
               isSendSuccessful = true
             }
@@ -192,7 +196,7 @@ class RequestSendThread(val controllerId: Int,
             case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
               warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
-                  request.toString, brokerNode.toString), e)
+                  requestBuilder.toString, brokerNode.toString), e)
               networkClient.close(brokerNode.idString)
               isSendSuccessful = false
               backoff()
@@ -358,8 +362,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
           topicPartition -> partitionState
         }
-        val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
-        controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
+        val leaderAndIsrRequest = new LeaderAndIsrRequest.
+            Builder(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
+        controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, null)
       }
       leaderAndIsrRequestMap.clear()
 
@@ -374,27 +379,33 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
         topicPartition -> partitionState
       }
 
-      val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
+      val version: Short = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
                     else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
                     else 0: Short
 
-      val updateMetadataRequest =
-        if (version == 0) {
-          val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
-          new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
-        }
-        else {
-          val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
+      val updateMetadataRequest = {
+        val liveBrokers = if (version == 0) {
+          // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
+          controllerContext.liveOrShuttingDownBrokers.map { broker =>
+            val node = broker.getNode(SecurityProtocol.PLAINTEXT)
+            val endPoints = Map(SecurityProtocol.PLAINTEXT -> new EndPoint(node.host(), node.port()))
+            new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
+          }
+        } else {
+          controllerContext.liveOrShuttingDownBrokers.map { broker =>
             val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
               securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
             }
             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
           }
-          new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
         }
+        new UpdateMetadataRequest.Builder(
+            controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava).
+            setVersion(version)
+      }
 
-      updateMetadataRequestBrokerSet.foreach {broker =>
-        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
+      updateMetadataRequestBrokerSet.foreach { broker =>
+        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest, null)
       }
       updateMetadataRequestBrokerSet.clear()
       updateMetadataRequestPartitionInfoMap.clear()
@@ -411,19 +422,20 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
 
         // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially
         // changes the order in which the requests are sent for the same partitions, but that's OK.
-        val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, false,
+        val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false,
           replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava)
-        controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest)
+        controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest)
 
         replicasToNotGroup.foreach { r =>
-          val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,
-            Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
-          controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)
+          val stopReplicaRequest = new StopReplicaRequest.Builder(
+              controllerId, controllerEpoch, r.deletePartition,
+              Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
+          controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback)
         }
       }
       stopReplicaRequestMap.clear()
     } catch {
-      case e : Throwable => {
+      case e: Throwable =>
         if (leaderAndIsrRequestMap.nonEmpty) {
           error("Haven't been able to send leader and isr requests, current state of " +
               s"the map is $leaderAndIsrRequestMap. Exception message: $e")
@@ -437,7 +449,6 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
               s"the map is $stopReplicaRequestMap. Exception message: $e")
         }
         throw new IllegalStateException(e)
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0aec81d..83c6d01 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -689,8 +689,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     onControllerResignation()
   }
 
-  def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit = null) = {
-    controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, apiVersion, request, callback)
+  def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
+                  callback: AbstractResponse => Unit = null) = {
+    controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, request, callback)
   }
 
   def incrementControllerEpoch(zkClient: ZkClient) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index a19ad22..a511440 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -42,8 +42,8 @@ object RequestChannel extends Logging {
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
   def getShutdownReceive() = {
-    val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, "", 0)
-    val emptyProduceRequest = new ProduceRequest(0, 0, new HashMap[TopicPartition, MemoryRecords]())
+    val emptyProduceRequest = new ProduceRequest.Builder(0, 0, new HashMap[TopicPartition, MemoryRecords]()).build()
+    val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, emptyProduceRequest.version(), "", 0)
     AbstractRequestResponse.serialize(emptyRequestHeader, emptyProduceRequest)
   }
 
@@ -86,7 +86,7 @@ object RequestChannel extends Logging {
         try {
           // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
           if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
-            new ApiVersionsRequest
+            new ApiVersionsRequest.Builder().build()
           else
             AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
         } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7fd8c2b..794c23d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -105,7 +105,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           request.requestObj.handleError(e, requestChannel, request)
           error("Error when handling request %s".format(request.requestObj), e)
         } else {
-          val response = request.body.getErrorResponse(request.header.apiVersion, e)
+          val response = request.body.getErrorResponse(e)
 
           /* If request doesn't have a default error response, we just close the connection.
              For example, when produce request has acks set to 0 */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index df40c64..7cf3940 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -379,7 +379,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           config.requestTimeoutMs,
-          time)
+          time,
+          false)
       }
 
       var shutdownSucceeded: Boolean = false
@@ -425,10 +426,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                 throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
               // send the controlled shutdown request
-              val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)
-              val controlledShutdownRequest = new ControlledShutdownRequest(config.brokerId)
-              val request = new ClientRequest(node(prevController).idString, time.milliseconds(), true,
-                requestHeader, controlledShutdownRequest, null)
+              val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId)
+              val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
+                time.milliseconds(), true)
               val clientResponse = networkClient.blockingSendAndReceive(request)(time)
 
               val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 485a25e..d6663fa 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOff
 import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.utils.Time
 
@@ -102,7 +102,8 @@ class ReplicaFetcherThread(name: String,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       brokerConfig.replicaSocketReceiveBufferBytes,
       brokerConfig.requestTimeoutMs,
-      time
+      time,
+      false
     )
   }
 
@@ -230,21 +231,21 @@ class ReplicaFetcherThread(name: String,
   }
 
   protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
-    val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
+    val clientResponse = sendRequest(fetchRequest.underlying)
     val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
     fetchResponse.responseData.asScala.toSeq.map { case (key, value) =>
       key -> new PartitionData(value)
     }
   }
 
-  private def sendRequest(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest): ClientResponse = {
+  private def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse = {
     import kafka.utils.NetworkClientBlockingOps._
-    val header = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
     try {
       if (!networkClient.blockingReady(sourceNode, socketTimeout)(time))
         throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
       else {
-        val clientRequest = new ClientRequest(sourceBroker.id.toString, time.milliseconds(), true, header, request, null)
+        val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder,
+          time.milliseconds(), true)
         networkClient.blockingSendAndReceive(clientRequest)(time)
       }
     }
@@ -257,15 +258,18 @@ class ReplicaFetcherThread(name: String,
   }
 
   private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, consumerId: Int): Long = {
-    val (request, apiVersion) =
-      if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
+    val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
         val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long))
-        (new ListOffsetRequest(partitions.asJava, consumerId), 1)
+        new ListOffsetRequest.Builder(consumerId).
+            setTargetTimes(partitions.asJava).
+            setVersion(1)
       } else {
         val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1))
-        (new ListOffsetRequest(consumerId, partitions.asJava), 0)
+        new ListOffsetRequest.Builder(consumerId).
+            setOffsetData(partitions.asJava).
+            setVersion(0)
       }
-    val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, Some(apiVersion.toShort), request)
+    val clientResponse = sendRequest(requestBuilder)
     val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse]
     val partitionData = response.responseData.get(topicPartition)
     Errors.forCode(partitionData.errorCode) match {
@@ -287,11 +291,10 @@ class ReplicaFetcherThread(name: String,
         requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
     }
 
-    val request =
-      if (fetchRequestVersion >= 3) JFetchRequest.fromReplica(replicaId, maxWait, minBytes, maxBytes, requestMap)
-      else JFetchRequest.fromReplica(replicaId, maxWait, minBytes, requestMap)
-
-    new FetchRequest(request)
+    val requestBuilder = new JFetchRequest.Builder(maxWait, minBytes, requestMap).
+        setReplicaId(replicaId).setMaxBytes(maxBytes)
+    requestBuilder.setVersion(fetchRequestVersion)
+    new FetchRequest(requestBuilder)
   }
 
   /**
@@ -306,10 +309,10 @@ class ReplicaFetcherThread(name: String,
 
 object ReplicaFetcherThread {
 
-  private[server] class FetchRequest(val underlying: JFetchRequest) extends AbstractFetcherThread.FetchRequest {
-    def isEmpty: Boolean = underlying.fetchData.isEmpty
+  private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest {
+    def isEmpty: Boolean = underlying.fetchData().isEmpty
     def offset(topicPartition: TopicPartition): Long =
-      underlying.fetchData.asScala(topicPartition).offset
+      underlying.fetchData().asScala(topicPartition).offset
   }
 
   private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index 62e7d94..0370564 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -105,14 +105,15 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
    */
   def blockingSendAndReceive(request: ClientRequest)(implicit time: Time): ClientResponse = {
     client.send(request, time.milliseconds())
-
     pollContinuously { responses =>
       val response = responses.find { response =>
-        response.requestHeader.correlationId == request.header.correlationId
+        response.requestHeader.correlationId == request.correlationId
       }
       response.foreach { r =>
         if (r.wasDisconnected)
           throw new IOException(s"Connection to ${request.destination} was disconnected before the response was read")
+        else if (r.versionMismatch() != null)
+          throw r.versionMismatch();
       }
       response
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 6eebbbf..a2fc2d5 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -177,79 +177,87 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createMetadataRequest = {
-    new requests.MetadataRequest(List(topic).asJava)
+    new requests.MetadataRequest.Builder(List(topic).asJava).build()
   }
 
   private def createProduceRequest = {
-    new requests.ProduceRequest(1, 5000, collection.mutable.Map(tp -> MemoryRecords.readableRecords(ByteBuffer.wrap("test".getBytes))).asJava)
+    new requests.ProduceRequest.Builder(1, 5000,
+      collection.mutable.Map(tp -> MemoryRecords.readableRecords(ByteBuffer.wrap("test".getBytes))).asJava).
+      build()
   }
 
   private def createFetchRequest = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
     partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 100))
-    new requests.FetchRequest(5000, 100, Int.MaxValue, partitionMap)
+    new requests.FetchRequest.Builder(100, Int.MaxValue, partitionMap).setReplicaId(5000).build()
   }
 
   private def createListOffsetsRequest = {
-    new requests.ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava)
+    new requests.ListOffsetRequest.Builder().setTargetTimes(
+      Map(tp -> (0L: java.lang.Long)).asJava).
+      build()
   }
 
   private def createOffsetFetchRequest = {
-    new requests.OffsetFetchRequest(group, List(tp).asJava)
+    new requests.OffsetFetchRequest.Builder(group, List(tp).asJava).build()
   }
 
   private def createGroupCoordinatorRequest = {
-    new requests.GroupCoordinatorRequest(group)
+    new requests.GroupCoordinatorRequest.Builder(group).build()
   }
 
   private def createUpdateMetadataRequest = {
     val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
       Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava, null)).asJava
-    new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers)
+    new requests.UpdateMetadataRequest.Builder(brokerId, Int.MaxValue, partitionState, brokers).build()
   }
 
   private def createJoinGroupRequest = {
-    new JoinGroupRequest(group, 10000, 60000, "", "consumer",
+    new JoinGroupRequest.Builder(group, 10000, "", "consumer",
       List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+      .setRebalanceTimeout(60000).build();
   }
 
   private def createSyncGroupRequest = {
-    new SyncGroupRequest(group, 1, "", Map[String, ByteBuffer]().asJava)
+    new SyncGroupRequest.Builder(group, 1, "", Map[String, ByteBuffer]().asJava).build()
   }
 
   private def createOffsetCommitRequest = {
-    new requests.OffsetCommitRequest(group, 1, "", 1000, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava)
+    new requests.OffsetCommitRequest.Builder(
+      group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
+      setMemberId("").setGenerationId(1).setRetentionTime(1000).
+      build()
   }
 
   private def createHeartbeatRequest = {
-    new HeartbeatRequest(group, 1, "")
+    new HeartbeatRequest.Builder(group, 1, "").build()
   }
 
   private def createLeaveGroupRequest = {
-    new LeaveGroupRequest(group, "")
+    new LeaveGroupRequest.Builder(group, "").build()
   }
 
   private def createLeaderAndIsrRequest = {
-    new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
+    new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
       Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
-      Set(new Node(brokerId, "localhost", 0)).asJava)
+      Set(new Node(brokerId, "localhost", 0)).asJava).build()
   }
 
   private def createStopReplicaRequest = {
-    new requests.StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava)
+    new requests.StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build()
   }
 
   private def createControlledShutdownRequest = {
-    new requests.ControlledShutdownRequest(brokerId)
+    new requests.ControlledShutdownRequest.Builder(brokerId).build()
   }
 
   private def createTopicsRequest = {
-    new CreateTopicsRequest(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0)
+    new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0).build()
   }
 
   private def deleteTopicsRequest = {
-    new DeleteTopicsRequest(Set(deleteTopic).asJava, 5000)
+    new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 8547818..8a0ae1a 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -119,8 +119,9 @@ class SocketServerTest extends JUnitSuite {
     val ackTimeoutMs = 10000
     val ack = 0: Short
 
-    val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
-    val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]())
+    val emptyRequest = new ProduceRequest.Builder(
+        ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build()
+    val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, clientId, correlationId)
 
     val byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf + emptyRequest.sizeOf)
     emptyHeader.writeTo(byteBuffer)
@@ -287,8 +288,9 @@ class SocketServerTest extends JUnitSuite {
       val clientId = ""
       val ackTimeoutMs = 10000
       val ack = 0: Short
-      val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
-      val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]())
+      val emptyRequest = new ProduceRequest.Builder(
+          ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build()
+      val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, clientId, correlationId)
 
       val byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf() + emptyRequest.sizeOf())
       emptyHeader.writeTo(byteBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 68e017f..8f6faf9 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -17,7 +17,8 @@
 
 package kafka.server
 
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.junit.Assert._
@@ -44,18 +45,20 @@ class ApiVersionsRequestTest extends BaseRequestTest {
 
   @Test
   def testApiVersionsRequest() {
-    val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, 0)
+    val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest.Builder().build())
     ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse)
   }
 
   @Test
   def testApiVersionsRequestWithUnsupportedVersion() {
-    val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, Short.MaxValue)
+    val apiVersionsRequest = new ApiVersionsRequest(
+      new Struct(ProtoUtils.currentRequestSchema(ApiKeys.API_VERSIONS.id)), Short.MaxValue)
+    val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest)
     assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
   }
 
-  private def sendApiVersionsRequest(request: ApiVersionsRequest, version: Short): ApiVersionsResponse = {
-    val response = send(request, ApiKeys.API_VERSIONS, Some(version))
+  private def sendApiVersionsRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
+    val response = send(request, ApiKeys.API_VERSIONS)
     ApiVersionsResponse.parse(response)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
index b26f621..e9adca8 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -18,7 +18,7 @@
 package unit.kafka.server
 
 import org.apache.kafka.common.requests.ApiVersionsResponse
-import org.apache.kafka.common.protocol.{Protocol, ApiKeys}
+import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, Protocol}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -32,8 +32,8 @@ class ApiVersionsTest {
     for (key <- ApiKeys.values) {
       val version = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(key.id)
       assertNotNull(s"Could not find ApiVersion for API ${key.name}", version)
-      assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, Protocol.MIN_VERSIONS(key.id))
-      assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, Protocol.CURR_VERSION(key.id))
+      assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, ProtoUtils.oldestVersion(key.id))
+      assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, ProtoUtils.latestVersion(key.id))
 
       // Check if versions less than min version are indeed set as null, i.e., deprecated.
       for (i <- 0 until version.minVersion) {
@@ -48,4 +48,4 @@ class ApiVersionsTest {
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index a166495..5c29935 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -100,17 +100,15 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
     *
     * @param request
     * @param apiKey
-    * @param version An optional version to use when sending the request. If not set, the latest known version is used
     * @param destination An optional SocketServer ot send the request to. If not set, any available server is used.
     * @param protocol An optional SecurityProtocol to use. If not set, PLAINTEXT is used.
     * @return
     */
-  def send(request: AbstractRequest, apiKey: ApiKeys, version: Option[Short] = None,
+  def send(request: AbstractRequest, apiKey: ApiKeys,
            destination: SocketServer = anySocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): ByteBuffer = {
-    val requestVersion = version.getOrElse(ProtoUtils.latestVersion(apiKey.id))
     val socket = connect(destination, protocol)
     try {
-      send(request, apiKey, requestVersion, socket)
+      send(request, apiKey, socket)
     } finally {
       socket.close()
     }
@@ -120,10 +118,10 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
     * Serializes and send the request to the given api.
     * A ByteBuffer containing the response is returned.
     */
-  def send(request: AbstractRequest, apiKey: ApiKeys, version: Short, socket: Socket): ByteBuffer = {
+  def send(request: AbstractRequest, apiKey: ApiKeys, socket: Socket): ByteBuffer = {
     correlationId += 1
     val serializedBytes = {
-      val header = new RequestHeader(apiKey.id, version, "", correlationId)
+      val header = new RequestHeader(apiKey.id, request.version, "", correlationId)
       val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf)
       header.writeTo(byteBuffer)
       request.writeTo(byteBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 4205ccb..27f4a32 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -34,27 +34,27 @@ class CreateTopicsRequestTest extends BaseRequestTest {
   def testValidCreateTopicsRequests() {
     val timeout = 10000
     // Generated assignments
-    validateValidCreateTopicsRequests(new CreateTopicsRequest(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout))
-    validateValidCreateTopicsRequests(new CreateTopicsRequest(Map("topic2" -> new CreateTopicsRequest.TopicDetails(1, 3.toShort)).asJava, timeout))
+    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build())
+    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic2" -> new CreateTopicsRequest.TopicDetails(1, 3.toShort)).asJava, timeout).build())
     val config3 = Map("min.insync.replicas" -> "2").asJava
-    validateValidCreateTopicsRequests(new CreateTopicsRequest(Map("topic3" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort, config3)).asJava, timeout))
+    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic3" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort, config3)).asJava, timeout).build())
     // Manual assignments
     val assignments4 = replicaAssignmentToJava(Map(0 -> List(0)))
-    validateValidCreateTopicsRequests(new CreateTopicsRequest(Map("topic4" -> new CreateTopicsRequest.TopicDetails(assignments4)).asJava, timeout))
+    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic4" -> new CreateTopicsRequest.TopicDetails(assignments4)).asJava, timeout).build())
     val assignments5 = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))
     val config5 = Map("min.insync.replicas" -> "2").asJava
-    validateValidCreateTopicsRequests(new CreateTopicsRequest(Map("topic5" -> new CreateTopicsRequest.TopicDetails(assignments5, config5)).asJava, timeout))
+    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic5" -> new CreateTopicsRequest.TopicDetails(assignments5, config5)).asJava, timeout).build())
     // Mixed
     val assignments8 = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))
-    validateValidCreateTopicsRequests(new CreateTopicsRequest(Map(
+    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(
       "topic6" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
       "topic7" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort),
-      "topic8" -> new CreateTopicsRequest.TopicDetails(assignments8)).asJava, timeout)
+      "topic8" -> new CreateTopicsRequest.TopicDetails(assignments8)).asJava, timeout).build()
     )
   }
 
   private def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
-    val response = sendCreateTopicRequest(request, 0)
+    val response = sendCreateTopicRequest(request)
 
     val error = response.errors.values.asScala.find(_ != Errors.NONE)
     assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
@@ -62,7 +62,8 @@ class CreateTopicsRequestTest extends BaseRequestTest {
     request.topics.asScala.foreach { case (topic, details) =>
 
       def verifyMetadata(socketServer: SocketServer) = {
-        val metadata = sendMetadataRequest(new MetadataRequest(List(topic).asJava)).topicMetadata.asScala
+        val metadata = sendMetadataRequest(
+          new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
         val metadataForTopic = metadata.filter(p => p.topic.equals(topic)).head
 
         val partitions = if (!details.replicasAssignments.isEmpty)
@@ -95,27 +96,27 @@ class CreateTopicsRequestTest extends BaseRequestTest {
     TestUtils.createTopic(zkUtils, existingTopic, 1, 1, servers)
 
     // Basic
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest(Map(existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout),
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
       Map(existingTopic -> Errors.TOPIC_ALREADY_EXISTS))
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest(Map("error-partitions" -> new CreateTopicsRequest.TopicDetails(-1, 1.toShort)).asJava, timeout),
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-partitions" -> new CreateTopicsRequest.TopicDetails(-1, 1.toShort)).asJava, timeout).build(),
       Map("error-partitions" -> Errors.INVALID_PARTITIONS))
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest(Map("error-replication" -> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort)).asJava, timeout),
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-replication" -> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort)).asJava, timeout).build(),
       Map("error-replication" -> Errors.INVALID_REPLICATION_FACTOR))
     val invalidConfig = Map("not.a.property" -> "error").asJava
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout),
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout).build(),
       Map("error-config" -> Errors.INVALID_CONFIG))
     val invalidAssignments = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(0)))
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout),
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout).build(),
       Map("error-assignment" -> Errors.INVALID_REPLICA_ASSIGNMENT))
 
     // Partial
     validateErrorCreateTopicsRequests(
-      new CreateTopicsRequest(Map(
+      new CreateTopicsRequest.Builder(Map(
         existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
         "partial-partitions" -> new CreateTopicsRequest.TopicDetails(-1, 1.toShort),
         "partial-replication" -> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort),
         "partial-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments),
-        "partial-none" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout),
+        "partial-none" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
       Map(
         existingTopic -> Errors.TOPIC_ALREADY_EXISTS,
         "partial-partitions" -> Errors.INVALID_PARTITIONS,
@@ -128,12 +129,12 @@ class CreateTopicsRequestTest extends BaseRequestTest {
 
     // Timeout
     // We don't expect a request to ever complete within 1ms. A timeout of 1 ms allows us to test the purgatory timeout logic.
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest(Map("error-timeout" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 1),
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 1).build(),
       Map("error-timeout" -> Errors.REQUEST_TIMED_OUT))
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest(Map("error-timeout-zero" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 0),
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout-zero" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 0).build(),
       Map("error-timeout-zero" -> Errors.REQUEST_TIMED_OUT))
     // Negative timeouts are treated the same as 0
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest(Map("error-timeout-negative" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, -1),
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout-negative" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, -1).build(),
       Map("error-timeout-negative" -> Errors.REQUEST_TIMED_OUT))
     // The topics should still get created eventually
     TestUtils.waitUntilMetadataIsPropagated(servers, "error-timeout", 0)
@@ -147,15 +148,16 @@ class CreateTopicsRequestTest extends BaseRequestTest {
   @Test
   def testInvalidCreateTopicsRequests() {
     // Duplicate
-    val singleRequest = new CreateTopicsRequest(Map("duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000)
+    val singleRequest = new CreateTopicsRequest.Builder(Map("duplicate-topic" ->
+        new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
     val duplicateRequest = duplicateFirstTopic(singleRequest)
     assertFalse("Request doesn't have duplicate topics", duplicateRequest.duplicateTopics().isEmpty)
     validateErrorCreateTopicsRequests(duplicateRequest, Map("duplicate-topic" -> Errors.INVALID_REQUEST))
 
     // Duplicate Partial
-    val doubleRequest = new CreateTopicsRequest(Map(
+    val doubleRequest = new CreateTopicsRequest.Builder(Map(
       "duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
-      "other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000)
+      "other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
     val duplicateDoubleRequest = duplicateFirstTopic(doubleRequest)
     assertFalse("Request doesn't have duplicate topics", duplicateDoubleRequest.duplicateTopics().isEmpty)
     validateErrorCreateTopicsRequests(duplicateDoubleRequest, Map(
@@ -164,7 +166,8 @@ class CreateTopicsRequestTest extends BaseRequestTest {
 
     // Partitions/ReplicationFactor and ReplicaAssignment
     val assignments = replicaAssignmentToJava(Map(0 -> List(0)))
-    val assignmentRequest = new CreateTopicsRequest(Map("bad-args-topic" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, 1000)
+    val assignmentRequest = new CreateTopicsRequest.Builder(Map("bad-args-topic" ->
+        new CreateTopicsRequest.TopicDetails(assignments)).asJava, 1000).build()
     val badArgumentsRequest = addPartitionsAndReplicationFactorToFirstTopic(assignmentRequest)
     validateErrorCreateTopicsRequests(badArgumentsRequest, Map("bad-args-topic" -> Errors.INVALID_REQUEST))
   }
@@ -175,7 +178,7 @@ class CreateTopicsRequestTest extends BaseRequestTest {
     val firstTopic = topics(0).asInstanceOf[Struct]
     val newTopics = firstTopic :: topics.toList
     struct.set("create_topic_requests", newTopics.toArray)
-    new CreateTopicsRequest(struct)
+    new CreateTopicsRequest(struct, request.version)
   }
 
   private def addPartitionsAndReplicationFactorToFirstTopic(request: CreateTopicsRequest) = {
@@ -184,11 +187,11 @@ class CreateTopicsRequestTest extends BaseRequestTest {
     val firstTopic = topics(0).asInstanceOf[Struct]
     firstTopic.set("num_partitions", 1)
     firstTopic.set("replication_factor", 1.toShort)
-    new CreateTopicsRequest(struct)
+    new CreateTopicsRequest(struct, request.version)
   }
 
   private def validateErrorCreateTopicsRequests(request: CreateTopicsRequest, expectedResponse: Map[String, Errors]): Unit = {
-    val response = sendCreateTopicRequest(request, 0)
+    val response = sendCreateTopicRequest(request)
     val errors = response.errors.asScala
     assertEquals("The response size should match", expectedResponse.size, response.errors.size)
 
@@ -203,8 +206,8 @@ class CreateTopicsRequestTest extends BaseRequestTest {
 
   @Test
   def testNotController() {
-    val request = new CreateTopicsRequest(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000)
-    val response = sendCreateTopicRequest(request, 0, notControllerSocketServer)
+    val request = new CreateTopicsRequest.Builder(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
+    val response = sendCreateTopicRequest(request, notControllerSocketServer)
 
     val error = response.errors.asScala.head._2
     assertEquals("Expected controller error when routed incorrectly",  Errors.NOT_CONTROLLER, error)
@@ -212,7 +215,8 @@ class CreateTopicsRequestTest extends BaseRequestTest {
 
   private def validateTopicExists(topic: String): Unit = {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
-    val metadata = sendMetadataRequest(new MetadataRequest(List(topic).asJava)).topicMetadata.asScala
+    val metadata = sendMetadataRequest(
+      new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
     assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error() == Errors.NONE))
   }
 
@@ -220,9 +224,9 @@ class CreateTopicsRequestTest extends BaseRequestTest {
     assignments.map { case (k, v) => (k:Integer, v.map { i => i:Integer }.asJava) }.asJava
   }
 
-  private def sendCreateTopicRequest(request: CreateTopicsRequest, version: Short, socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {
-    val response = send(request, ApiKeys.CREATE_TOPICS, Some(version), socketServer)
-    CreateTopicsResponse.parse(response, version)
+  private def sendCreateTopicRequest(request: CreateTopicsRequest, socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {
+    val response = send(request, ApiKeys.CREATE_TOPICS, socketServer)
+    CreateTopicsResponse.parse(response, request.version)
   }
 
   private def sendMetadataRequest(request: MetadataRequest, destination: SocketServer = anySocketServer): MetadataResponse = {


Mime
View raw message