kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [4/5] kafka git commit: KAFKA-4507; Clients should support older brokers (KIP-97)
Date Wed, 11 Jan 2017 19:31:04 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1f54c0b..dc29936 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -32,13 +32,11 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -256,6 +254,11 @@ public class Sender implements Runnable {
             log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination());
             for (RecordBatch batch : batches.values())
                 completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
+        } else if (response.versionMismatch() != null) {
+            log.warn("Cancelled request {} due to a version mismatch with node {}",
+                    response, response.destination(), response.versionMismatch());
+            for (RecordBatch batch : batches.values())
+                completeBatch(batch, Errors.INVALID_REQUEST, -1L, Record.NO_TIMESTAMP, correlationId, now);
         } else {
             log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
             // if we have a response, parse it
@@ -349,8 +352,8 @@ public class Sender implements Runnable {
             recordsByPartition.put(tp, batch);
         }
 
-        ProduceRequest produceRequest = new ProduceRequest(acks, timeout, produceRecordsByPartition);
-        RequestHeader header = this.client.nextRequestHeader(ApiKeys.PRODUCE);
+        ProduceRequest.Builder requestBuilder =
+                new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
         RequestCompletionHandler callback = new RequestCompletionHandler() {
             public void onComplete(ClientResponse response) {
                 handleProduceResponse(response, recordsByPartition, time.milliseconds());
@@ -358,10 +361,9 @@ public class Sender implements Runnable {
         };
 
         String nodeId = Integer.toString(destination);
-        ClientRequest clientRequest = new ClientRequest(nodeId, now, acks != 0, header, produceRequest, callback);
-
+        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
         client.send(clientRequest, now);
-        log.trace("Sent produce request to {}: {}", nodeId, produceRequest);
+        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java b/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java
new file mode 100644
index 0000000..18f822d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that a request cannot be completed because an obsolete broker
+ * does not support the required functionality.
+ */
+public class ObsoleteBrokerException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ObsoleteBrokerException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ObsoleteBrokerException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index e07c3c3..239780c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -69,12 +69,16 @@ public enum ApiKeys {
     }
 
     public static ApiKeys forId(int id) {
-        if (id < MIN_API_KEY || id > MAX_API_KEY)
+        if (!hasId(id))
             throw new IllegalArgumentException(String.format("Unexpected ApiKeys id `%s`, it should be between `%s` " +
                     "and `%s` (inclusive)", id, MIN_API_KEY, MAX_API_KEY));
         return ID_TO_TYPE[id];
     }
 
+    public static boolean hasId(int id) {
+        return id >= MIN_API_KEY && id <= MAX_API_KEY;
+    }
+
     private static String toHtml() {
         final StringBuilder b = new StringBuilder();
         b.append("<table class=\"data-table\"><tbody>\n");

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
index c4d2cc6..ab2ebb1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -66,10 +66,6 @@ public class ProtoUtils {
         return requestSchema(apiKey, version).read(buffer);
     }
 
-    public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
-        return currentResponseSchema(apiKey).read(buffer);
-    }
-
     public static Struct parseResponse(int apiKey, int version, ByteBuffer buffer) {
         return responseSchema(apiKey, version).read(buffer);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index cd4e6e3..6c94f6f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -928,10 +928,10 @@ public class Protocol {
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
-    public static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1];
+    static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1];
 
     /* the latest version of each api */
-    public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
+    static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
 
     static {
         REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 80182e6..f2ea420 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -19,14 +19,42 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
 public abstract class AbstractRequest extends AbstractRequestResponse {
+    private final short version;
 
-    public AbstractRequest(Struct struct) {
+    public static abstract class Builder<T extends AbstractRequest> {
+        private final ApiKeys apiKey;
+        private short version;
+
+        public Builder(ApiKeys apiKey) {
+            this.apiKey = apiKey;
+            this.version = ProtoUtils.latestVersion(apiKey.id);
+        }
+
+        public ApiKeys apiKey() {
+            return apiKey;
+        }
+
+        public Builder<T> setVersion(short version) {
+            this.version = version;
+            return this;
+        }
+
+        public short version() {
+            return version;
+        }
+
+        public abstract T build();
+    }
+
+    public AbstractRequest(Struct struct, short version) {
         super(struct);
+        this.version = version;
     }
 
     public Send toSend(String destination, RequestHeader header) {
@@ -34,14 +62,21 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
     }
 
     /**
-     * Get an error response for a request for a given api version
+     * Get the version of this AbstractRequest object.
+     */
+    public short version() {
+        return version;
+    }
+
+    /**
+     * Get an error response for a request
      */
-    public abstract AbstractResponse getErrorResponse(int versionId, Throwable e);
+    public abstract AbstractResponse getErrorResponse(Throwable e);
 
     /**
      * Factory method for getting a request object based on ApiKey ID and a buffer
      */
-    public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
+    public static AbstractRequest getRequest(int requestId, short versionId, ByteBuffer buffer) {
         ApiKeys apiKey = ApiKeys.forId(requestId);
         switch (apiKey) {
             case PRODUCE:

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 5be5db1..17e6d5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -15,26 +15,40 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
 public class ApiVersionsRequest extends AbstractRequest {
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.API_VERSIONS.id);
-    public static final ApiVersionsRequest API_VERSIONS_REQUEST = new ApiVersionsRequest();
+    public static class Builder extends AbstractRequest.Builder<ApiVersionsRequest> {
+        public Builder() {
+            super(ApiKeys.API_VERSIONS);
+        }
+
+        @Override
+        public ApiVersionsRequest build() {
+            return new ApiVersionsRequest(version());
+        }
+
+        @Override
+        public String toString() {
+            return "(type=ApiVersionsRequest)";
+        }
+    }
 
-    public ApiVersionsRequest() {
-        super(new Struct(CURRENT_SCHEMA));
+    public ApiVersionsRequest(short version) {
+        this(new Struct(ProtoUtils.requestSchema(ApiKeys.API_VERSIONS.id, version)),
+                version);
     }
 
-    public ApiVersionsRequest(Struct struct) {
-        super(struct);
+    public ApiVersionsRequest(Struct struct, short versionId) {
+        super(struct, versionId);
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 short errorCode = Errors.forException(e).code();
@@ -46,10 +60,12 @@ public class ApiVersionsRequest extends AbstractRequest {
     }
 
     public static ApiVersionsRequest parse(ByteBuffer buffer, int versionId) {
-        return new ApiVersionsRequest(ProtoUtils.parseRequest(ApiKeys.API_VERSIONS.id, versionId, buffer));
+        return new ApiVersionsRequest(
+                ProtoUtils.parseRequest(ApiKeys.API_VERSIONS.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static ApiVersionsRequest parse(ByteBuffer buffer) {
-        return new ApiVersionsRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.API_VERSIONS.id));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index b9f453d..81be9c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -15,7 +15,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
@@ -120,7 +119,7 @@ public class ApiVersionsResponse extends AbstractResponse {
     private static ApiVersionsResponse createApiVersionsResponse() {
         List<ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
-            versionList.add(new ApiVersion(apiKey.id, Protocol.MIN_VERSIONS[apiKey.id], Protocol.CURR_VERSION[apiKey.id]));
+            versionList.add(new ApiVersion(apiKey.id, ProtoUtils.oldestVersion(apiKey.id), ProtoUtils.latestVersion(apiKey.id)));
         }
         return new ApiVersionsResponse(Errors.NONE.code(), versionList);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 24adb36..337ccfc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -16,33 +16,53 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
 public class ControlledShutdownRequest extends AbstractRequest {
+    private static final String BROKER_ID_KEY_NAME = "broker_id";
 
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
+    public static class Builder extends AbstractRequest.Builder<ControlledShutdownRequest> {
+        private final int brokerId;
 
-    private static final String BROKER_ID_KEY_NAME = "broker_id";
+        public Builder(int brokerId) {
+            super(ApiKeys.CONTROLLED_SHUTDOWN_KEY);
+            this.brokerId = brokerId;
+        }
 
+        @Override
+        public ControlledShutdownRequest build() {
+            return new ControlledShutdownRequest(brokerId, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=ControlledShutdownRequest").
+                append(", brokerId=").append(brokerId).
+                append(")");
+            return bld.toString();
+        }
+    }
     private int brokerId;
 
-    public ControlledShutdownRequest(int brokerId) {
-        super(new Struct(CURRENT_SCHEMA));
+    private ControlledShutdownRequest(int brokerId, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, version)),
+                version);
         struct.set(BROKER_ID_KEY_NAME, brokerId);
         this.brokerId = brokerId;
     }
 
-    public ControlledShutdownRequest(Struct struct) {
-        super(struct);
+    public ControlledShutdownRequest(Struct struct, short versionId) {
+        super(struct, versionId);
         brokerId = struct.getInt(BROKER_ID_KEY_NAME);
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 throw new IllegalArgumentException("Version 0 is not supported. It is only supported by " +
@@ -60,10 +80,11 @@ public class ControlledShutdownRequest extends AbstractRequest {
     }
 
     public static ControlledShutdownRequest parse(ByteBuffer buffer, int versionId) {
-        return new ControlledShutdownRequest(ProtoUtils.parseRequest(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, versionId, buffer));
+        return new ControlledShutdownRequest(
+                ProtoUtils.parseRequest(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, versionId, buffer), (short) versionId);
     }
 
     public static ControlledShutdownRequest parse(ByteBuffer buffer) {
-        return new ControlledShutdownRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 7c440dd..978d020 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -32,8 +32,6 @@ import java.util.Map;
 import java.util.Set;
 
 public class CreateTopicsRequest extends AbstractRequest {
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CREATE_TOPICS.id);
-
     private static final String REQUESTS_KEY_NAME = "create_topic_requests";
 
     private static final String TIMEOUT_KEY_NAME = "timeout";
@@ -83,6 +81,43 @@ public class CreateTopicsRequest extends AbstractRequest {
         public TopicDetails(Map<Integer, List<Integer>> replicasAssignments) {
             this(replicasAssignments, Collections.<String, String>emptyMap());
         }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(numPartitions=").append(numPartitions).
+                    append(", replicationFactor=").append(replicationFactor).
+                    append(", replicasAssignments=").append(Utils.mkString(replicasAssignments)).
+                    append(", configs=").append(Utils.mkString(configs)).
+                    append(")");
+            return bld.toString();
+        }
+    }
+
+    public static class Builder extends AbstractRequest.Builder<CreateTopicsRequest> {
+        private final Map<String, TopicDetails> topics;
+        private final Integer timeout;
+
+        public Builder(Map<String, TopicDetails> topics, Integer timeout) {
+            super(ApiKeys.CREATE_TOPICS);
+            this.topics = topics;
+            this.timeout = timeout;
+        }
+
+        @Override
+        public CreateTopicsRequest build() {
+            return new CreateTopicsRequest(topics, timeout, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=CreateTopicsRequest").
+                append(", topics=").append(Utils.mkString(topics)).
+                append(", timeout=").append(timeout).
+                append(")");
+            return bld.toString();
+        }
     }
 
     private final Map<String, TopicDetails> topics;
@@ -95,8 +130,8 @@ public class CreateTopicsRequest extends AbstractRequest {
     public static final int NO_NUM_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
 
-    public CreateTopicsRequest(Map<String, TopicDetails> topics, Integer timeout) {
-        super(new Struct(CURRENT_SCHEMA));
+    private CreateTopicsRequest(Map<String, TopicDetails> topics, Integer timeout, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.CREATE_TOPICS.id, version)), version);
 
         List<Struct> createTopicRequestStructs = new ArrayList<>(topics.size());
         for (Map.Entry<String, TopicDetails> entry : topics.entrySet()) {
@@ -138,8 +173,8 @@ public class CreateTopicsRequest extends AbstractRequest {
         this.duplicateTopics = Collections.emptySet();
     }
 
-    public CreateTopicsRequest(Struct struct) {
-        super(struct);
+    public CreateTopicsRequest(Struct struct, short versionId) {
+        super(struct, versionId);
 
         Object[] requestStructs = struct.getArray(REQUESTS_KEY_NAME);
         Map<String, TopicDetails> topics = new HashMap<>();
@@ -194,12 +229,13 @@ public class CreateTopicsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         Map<String, Errors> topicErrors = new HashMap<>();
         for (String topic : topics.keySet()) {
             topicErrors.put(topic, Errors.forException(e));
         }
 
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return new CreateTopicsResponse(topicErrors);
@@ -222,10 +258,12 @@ public class CreateTopicsRequest extends AbstractRequest {
     }
 
     public static CreateTopicsRequest parse(ByteBuffer buffer, int versionId) {
-        return new CreateTopicsRequest(ProtoUtils.parseRequest(ApiKeys.CREATE_TOPICS.id, versionId, buffer));
+        return new CreateTopicsRequest(
+                ProtoUtils.parseRequest(ApiKeys.CREATE_TOPICS.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static CreateTopicsRequest parse(ByteBuffer buffer) {
-        return new CreateTopicsRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.CREATE_TOPICS.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index 0632cc0..2874fad 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -29,15 +29,40 @@ import java.util.Map;
 import java.util.Set;
 
 public class DeleteTopicsRequest extends AbstractRequest {
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.DELETE_TOPICS.id);
     private static final String TOPICS_KEY_NAME = "topics";
     private static final String TIMEOUT_KEY_NAME = "timeout";
 
     private final Set<String> topics;
     private final Integer timeout;
 
-    public DeleteTopicsRequest(Set<String> topics, Integer timeout) {
-        super(new Struct(CURRENT_SCHEMA));
+    public static class Builder extends AbstractRequest.Builder<DeleteTopicsRequest> {
+        private final Set<String> topics;
+        private final Integer timeout;
+
+        public Builder(Set<String> topics, Integer timeout) {
+            super(ApiKeys.DELETE_TOPICS);
+            this.topics = topics;
+            this.timeout = timeout;
+        }
+
+        @Override
+        public DeleteTopicsRequest build() {
+            return new DeleteTopicsRequest(topics, timeout, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=DeleteTopicsRequest").
+                append(", topics=(").append(Utils.join(topics, ", ")).append(")").
+                append(", timeout=").append(timeout).
+                append(")");
+            return bld.toString();
+        }
+    }
+
+    private DeleteTopicsRequest(Set<String> topics, Integer timeout, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.DELETE_TOPICS.id, version)), version);
 
         struct.set(TOPICS_KEY_NAME, topics.toArray());
         struct.set(TIMEOUT_KEY_NAME, timeout);
@@ -46,8 +71,8 @@ public class DeleteTopicsRequest extends AbstractRequest {
         this.timeout = timeout;
     }
 
-    public DeleteTopicsRequest(Struct struct) {
-        super(struct);
+    public DeleteTopicsRequest(Struct struct, short version) {
+        super(struct, version);
         Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
         Set<String> topics = new HashSet<>(topicsArray.length);
         for (Object topic : topicsArray)
@@ -58,17 +83,17 @@ public class DeleteTopicsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         Map<String, Errors> topicErrors = new HashMap<>();
         for (String topic : topics)
             topicErrors.put(topic, Errors.forException(e));
 
-        switch (versionId) {
+        switch (version()) {
             case 0:
                 return new DeleteTopicsResponse(topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                    versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id)));
+                    version(), this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id)));
         }
     }
 
@@ -81,10 +106,11 @@ public class DeleteTopicsRequest extends AbstractRequest {
     }
 
     public static DeleteTopicsRequest parse(ByteBuffer buffer, int versionId) {
-        return new DeleteTopicsRequest(ProtoUtils.parseRequest(ApiKeys.DELETE_TOPICS.id, versionId, buffer));
+        return new DeleteTopicsRequest(ProtoUtils.parseRequest(ApiKeys.DELETE_TOPICS.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static DeleteTopicsRequest parse(ByteBuffer buffer) {
-        return new DeleteTopicsRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index b965c91..f17cdd9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -15,27 +15,47 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 public class DescribeGroupsRequest extends AbstractRequest {
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.DESCRIBE_GROUPS.id);
     private static final String GROUP_IDS_KEY_NAME = "group_ids";
 
+    public static class Builder extends AbstractRequest.Builder<DescribeGroupsRequest> {
+        private final List<String> groupIds;
+
+        public Builder(List<String> groupIds) {
+            super(ApiKeys.DESCRIBE_GROUPS);
+            this.groupIds = groupIds;
+        }
+
+        @Override
+        public DescribeGroupsRequest build() {
+            short version = version();
+            return new DescribeGroupsRequest(this.groupIds, version);
+        }
+
+        @Override
+        public String toString() {
+            return "(type=DescribeGroupsRequest, groupIds=(" + Utils.join(groupIds, ",") + "))";
+        }
+    }
+
     private final List<String> groupIds;
 
-    public DescribeGroupsRequest(List<String> groupIds) {
-        super(new Struct(CURRENT_SCHEMA));
+    private DescribeGroupsRequest(List<String> groupIds, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.DESCRIBE_GROUPS.id, version)),
+                version);
         struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
         this.groupIds = groupIds;
     }
 
-    public DescribeGroupsRequest(Struct struct) {
-        super(struct);
+    public DescribeGroupsRequest(Struct struct, short version) {
+        super(struct, version);
         this.groupIds = new ArrayList<>();
         for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME))
             this.groupIds.add((String) groupId);
@@ -46,7 +66,8 @@ public class DescribeGroupsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
@@ -58,11 +79,11 @@ public class DescribeGroupsRequest extends AbstractRequest {
     }
 
     public static DescribeGroupsRequest parse(ByteBuffer buffer, int versionId) {
-        return new DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, versionId, buffer));
+        return new DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static DescribeGroupsRequest parse(ByteBuffer buffer) {
-        return new DescribeGroupsRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.DESCRIBE_GROUPS.id));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index b5770d4..4768a20 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -22,14 +22,12 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.Utils;
 
 public class FetchRequest extends AbstractRequest {
-
     public static final int CONSUMER_REPLICA_ID = -1;
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
     private static final String REPLICA_ID_KEY_NAME = "replica_id";
     private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
     private static final String MIN_BYTES_KEY_NAME = "min_bytes";
@@ -63,6 +61,11 @@ public class FetchRequest extends AbstractRequest {
             this.offset = offset;
             this.maxBytes = maxBytes;
         }
+
+        @Override
+        public String toString() {
+            return "(offset=" + offset + ", maxBytes=" + maxBytes + ")";
+        }
     }
 
     static final class TopicAndPartitionData<T> {
@@ -88,43 +91,66 @@ public class FetchRequest extends AbstractRequest {
         }
     }
 
-    /**
-     * Create a non-replica fetch request for versions 0, 1 or 2 (the version is set via the RequestHeader).
-     */
-    @Deprecated
-    public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        // Any of 0, 1 or 2 would do here since the schemas for these versions are identical
-        this(2, CONSUMER_REPLICA_ID, maxWait, minBytes, DEFAULT_RESPONSE_MAX_BYTES, new LinkedHashMap<>(fetchData));
-    }
+    public static class Builder extends AbstractRequest.Builder<FetchRequest> {
+        private int replicaId = CONSUMER_REPLICA_ID;
+        private int maxWait;
+        private final int minBytes;
+        private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
+        private LinkedHashMap<TopicPartition, PartitionData> fetchData;
 
-    /**
-     * Create a non-replica fetch request for the current version.
-     */
-    public FetchRequest(int maxWait, int minBytes, int maxBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData) {
-        this(ProtoUtils.latestVersion(ApiKeys.FETCH.id), CONSUMER_REPLICA_ID, maxWait, minBytes, maxBytes, fetchData);
-    }
+        public Builder(int maxWait, int minBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData) {
+            super(ApiKeys.FETCH);
+            this.maxWait = maxWait;
+            this.minBytes = minBytes;
+            this.fetchData = fetchData;
+        }
 
-    /**
-     * Create a replica fetch request for versions 0, 1 or 2 (the actual version is determined by the RequestHeader).
-     */
-    @Deprecated
-    public static FetchRequest fromReplica(int replicaId, int maxWait, int minBytes,
-                                           Map<TopicPartition, PartitionData> fetchData) {
-        // Any of 0, 1 or 2 would do here since the schemas for these versions are identical
-        return new FetchRequest(2, replicaId, maxWait, minBytes, DEFAULT_RESPONSE_MAX_BYTES, new LinkedHashMap<>(fetchData));
-    }
+        public Builder setReplicaId(int replicaId) {
+            this.replicaId = replicaId;
+            return this;
+        }
 
-    /**
-     * Create a replica fetch request for the current version.
-     */
-    public static FetchRequest fromReplica(int replicaId, int maxWait, int minBytes, int maxBytes,
-                                           LinkedHashMap<TopicPartition, PartitionData> fetchData) {
-        return new FetchRequest(ProtoUtils.latestVersion(ApiKeys.FETCH.id), replicaId, maxWait, minBytes, maxBytes, fetchData);
+        public Builder setMaxWait(int maxWait) {
+            this.maxWait = maxWait;
+            return this;
+        }
+
+        public Builder setMaxBytes(int maxBytes) {
+            this.maxBytes = maxBytes;
+            return this;
+        }
+
+        public LinkedHashMap<TopicPartition, PartitionData> fetchData() {
+            return this.fetchData;
+        }
+
+        @Override
+        public FetchRequest build() {
+            short version = version();
+            if (version < 3) {
+                maxBytes = -1;
+            }
+
+            return new FetchRequest(version, replicaId, maxWait, minBytes,
+                    maxBytes, fetchData);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type:FetchRequest").
+                    append(", replicaId=").append(replicaId).
+                    append(", maxWait=").append(maxWait).
+                    append(", minBytes=").append(minBytes).
+                    append(", fetchData=").append(Utils.mkString(fetchData)).
+                    append(")");
+            return bld.toString();
+        }
     }
 
-    private FetchRequest(int version, int replicaId, int maxWait, int minBytes, int maxBytes,
+    private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes,
                          LinkedHashMap<TopicPartition, PartitionData> fetchData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.FETCH.id, version)));
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.FETCH.id, version)), version);
         List<TopicAndPartitionData<PartitionData>> topicsData = TopicAndPartitionData.batchByTopic(fetchData);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
@@ -156,8 +182,8 @@ public class FetchRequest extends AbstractRequest {
         this.fetchData = fetchData;
     }
 
-    public FetchRequest(Struct struct) {
-        super(struct);
+    public FetchRequest(Struct struct, short versionId) {
+        super(struct, versionId);
         replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
         maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
         minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
@@ -181,7 +207,7 @@ public class FetchRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
@@ -190,7 +216,7 @@ public class FetchRequest extends AbstractRequest {
 
             responseData.put(entry.getKey(), partitionResponse);
         }
-
+        short versionId = version();
         return new FetchResponse(versionId, responseData, 0);
     }
 
@@ -219,10 +245,10 @@ public class FetchRequest extends AbstractRequest {
     }
 
     public static FetchRequest parse(ByteBuffer buffer, int versionId) {
-        return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
+        return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer), (short) versionId);
     }
 
     public static FetchRequest parse(ByteBuffer buffer) {
-        return new FetchRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.FETCH.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
index 7fee476..d8ccdf6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -16,32 +16,53 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
 public class GroupCoordinatorRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_COORDINATOR.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
 
-    private final String groupId;
+    public static class Builder extends AbstractRequest.Builder<GroupCoordinatorRequest> {
+        private final String groupId;
+
+        public Builder(String groupId) {
+            super(ApiKeys.GROUP_COORDINATOR);
+            this.groupId = groupId;
+        }
+
+        @Override
+        public GroupCoordinatorRequest build() {
+            short version = version();
+            return new GroupCoordinatorRequest(this.groupId, version);
+        }
 
-    public GroupCoordinatorRequest(String groupId) {
-        super(new Struct(CURRENT_SCHEMA));
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=GroupCoordinatorRequest, groupId=");
+            bld.append(groupId).append(")");
+            return bld.toString();
+        }
+    }
+
+    private final String groupId;
 
+    private GroupCoordinatorRequest(String groupId, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.GROUP_COORDINATOR.id, version)),
+                version);
         struct.set(GROUP_ID_KEY_NAME, groupId);
         this.groupId = groupId;
     }
 
-    public GroupCoordinatorRequest(Struct struct) {
-        super(struct);
+    public GroupCoordinatorRequest(Struct struct, short versionId) {
+        super(struct, versionId);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
@@ -56,10 +77,11 @@ public class GroupCoordinatorRequest extends AbstractRequest {
     }
 
     public static GroupCoordinatorRequest parse(ByteBuffer buffer, int versionId) {
-        return new GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, versionId, buffer));
+        return new GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static GroupCoordinatorRequest parse(ByteBuffer buffer) {
-        return new GroupCoordinatorRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 3e7401c..0e5c17a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -15,24 +15,51 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
 public class HeartbeatRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
     private static final String MEMBER_ID_KEY_NAME = "member_id";
 
+    public static class Builder extends AbstractRequest.Builder<HeartbeatRequest> {
+        private final String groupId;
+        private final int groupGenerationId;
+        private final String memberId;
+
+        public Builder(String groupId, int groupGenerationId, String memberId) {
+            super(ApiKeys.HEARTBEAT);
+            this.groupId = groupId;
+            this.groupGenerationId = groupGenerationId;
+            this.memberId = memberId;
+        }
+
+        @Override
+        public HeartbeatRequest build() {
+            return new HeartbeatRequest(groupId, groupGenerationId, memberId, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=HeartbeatRequest").
+                append(", groupId=").append(groupId).
+                append(", groupGenerationId=").append(groupGenerationId).
+                append(", memberId=").append(memberId).
+                append(")");
+            return bld.toString();
+        }
+    }
+
     private final String groupId;
     private final int groupGenerationId;
     private final String memberId;
 
-    public HeartbeatRequest(String groupId, int groupGenerationId, String memberId) {
-        super(new Struct(CURRENT_SCHEMA));
+    private HeartbeatRequest(String groupId, int groupGenerationId, String memberId, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.HEARTBEAT.id, version)),
+                version);
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
         struct.set(MEMBER_ID_KEY_NAME, memberId);
@@ -41,15 +68,16 @@ public class HeartbeatRequest extends AbstractRequest {
         this.memberId = memberId;
     }
 
-    public HeartbeatRequest(Struct struct) {
-        super(struct);
+    public HeartbeatRequest(Struct struct, short versionId) {
+        super(struct, versionId);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
         memberId = struct.getString(MEMBER_ID_KEY_NAME);
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return new HeartbeatResponse(Errors.forException(e).code());
@@ -72,10 +100,10 @@ public class HeartbeatRequest extends AbstractRequest {
     }
 
     public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
-        return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
+        return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer), (short) versionId);
     }
 
     public static HeartbeatRequest parse(ByteBuffer buffer) {
-        return new HeartbeatRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 51855b6..3f00ed1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -15,8 +15,8 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -24,8 +24,6 @@ import java.util.Collections;
 import java.util.List;
 
 public class JoinGroupRequest extends AbstractRequest {
-
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
     private static final String REBALANCE_TIMEOUT_KEY_NAME = "rebalance_timeout";
@@ -62,51 +60,74 @@ public class JoinGroupRequest extends AbstractRequest {
         }
     }
 
-    // v0 constructor
-    @Deprecated
-    public JoinGroupRequest(String groupId,
-                            int sessionTimeout,
-                            String memberId,
-                            String protocolType,
-                            List<ProtocolMetadata> groupProtocols) {
-        this(0, groupId, sessionTimeout, sessionTimeout, memberId, protocolType, groupProtocols);
-    }
+    public static class Builder extends AbstractRequest.Builder<JoinGroupRequest> {
+        private final String groupId;
+        private final int sessionTimeout;
+        private final String memberId;
+        private final String protocolType;
+        private final List<ProtocolMetadata> groupProtocols;
+        private int rebalanceTimeout = 0;
+
+        public Builder(String groupId, int sessionTimeout, String memberId,
+                       String protocolType, List<ProtocolMetadata> groupProtocols) {
+            super(ApiKeys.JOIN_GROUP);
+            this.groupId = groupId;
+            this.sessionTimeout = sessionTimeout;
+            this.rebalanceTimeout = sessionTimeout;
+            this.memberId = memberId;
+            this.protocolType = protocolType;
+            this.groupProtocols = groupProtocols;
+        }
 
-    public JoinGroupRequest(String groupId,
-                            int sessionTimeout,
-                            int rebalanceTimeout,
-                            String memberId,
-                            String protocolType,
-                            List<ProtocolMetadata> groupProtocols) {
-        this(1, groupId, sessionTimeout, rebalanceTimeout, memberId, protocolType, groupProtocols);
-    }
+        public Builder setRebalanceTimeout(int rebalanceTimeout) {
+            this.rebalanceTimeout = rebalanceTimeout;
+            return this;
+        }
+
+        @Override
+        public JoinGroupRequest build() {
+            short version = version();
+            if (version < 1) {
+                rebalanceTimeout = -1;
+            }
+            return new JoinGroupRequest(version, groupId, sessionTimeout,
+                    rebalanceTimeout, memberId, protocolType, groupProtocols);
+        }
 
-    private JoinGroupRequest(int version,
-                             String groupId,
-                             int sessionTimeout,
-                             int rebalanceTimeout,
-                             String memberId,
-                             String protocolType,
-                             List<ProtocolMetadata> groupProtocols) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.JOIN_GROUP.id, version)));
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type: JoinGroupRequest").
+                append(", groupId=").append(groupId).
+                append(", sessionTimeout=").append(sessionTimeout).
+                append(", rebalanceTimeout=").append(rebalanceTimeout).
+                append(", memberId=").append(memberId).
+                append(", protocolType=").append(protocolType).
+                append(", groupProtocols=").append(Utils.join(groupProtocols, ", ")).
+                append(")");
+            return bld.toString();
+        }
+    }
 
+    private JoinGroupRequest(short version, String groupId, int sessionTimeout,
+            int rebalanceTimeout, String memberId, String protocolType,
+            List<ProtocolMetadata> groupProtocols) {
+        super(new Struct(ProtoUtils.
+                requestSchema(ApiKeys.JOIN_GROUP.id, version)), version);
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
-
-        if (version >= 1)
+        if (version >= 1) {
             struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout);
-
+        }
         struct.set(MEMBER_ID_KEY_NAME, memberId);
         struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
-
-        List<Struct> groupProtocolsList = new ArrayList<>();
+        List<Struct> groupProtocolsList = new ArrayList<>(groupProtocols.size());
         for (ProtocolMetadata protocol : groupProtocols) {
             Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
             protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
             protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
             groupProtocolsList.add(protocolStruct);
         }
-
         struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray());
         this.groupId = groupId;
         this.sessionTimeout = sessionTimeout;
@@ -116,8 +137,8 @@ public class JoinGroupRequest extends AbstractRequest {
         this.groupProtocols = groupProtocols;
     }
 
-    public JoinGroupRequest(Struct struct) {
-        super(struct);
+    public JoinGroupRequest(Struct struct, short versionId) {
+        super(struct, versionId);
 
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
@@ -142,7 +163,8 @@ public class JoinGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
             case 1:
@@ -186,10 +208,11 @@ public class JoinGroupRequest extends AbstractRequest {
     }
 
     public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
-        return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
+        return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static JoinGroupRequest parse(ByteBuffer buffer) {
-        return new JoinGroupRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 79dcd4a..1f09a12 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -34,9 +34,6 @@ import java.util.Map;
 import java.util.Set;
 
 public class LeaderAndIsrRequest extends AbstractRequest {
-
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
-
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
     private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
     private static final String PARTITION_STATES_KEY_NAME = "partition_states";
@@ -56,14 +53,49 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     private static final String HOST_KEY_NAME = "host";
     private static final String PORT_KEY_NAME = "port";
 
+    public static class Builder extends AbstractRequest.Builder<LeaderAndIsrRequest> {
+        private final int controllerId;
+        private final int controllerEpoch;
+        private final Map<TopicPartition, PartitionState> partitionStates;
+        private final Set<Node> liveLeaders;
+
+        public Builder(int controllerId, int controllerEpoch,
+                       Map<TopicPartition, PartitionState> partitionStates, Set<Node> liveLeaders) {
+            super(ApiKeys.LEADER_AND_ISR);
+            this.controllerId = controllerId;
+            this.controllerEpoch = controllerEpoch;
+            this.partitionStates = partitionStates;
+            this.liveLeaders = liveLeaders;
+        }
+
+        @Override
+        public LeaderAndIsrRequest build() {
+            return new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates,
+                    liveLeaders, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=LeaderAndIsRequest")
+                .append(", controllerId=").append(controllerId)
+                .append(", controllerEpoch=").append(controllerEpoch)
+                .append(", partitionStates=").append(Utils.mkString(partitionStates))
+                .append(", liveLeaders=(").append(Utils.join(liveLeaders, ", ")).append(")")
+                .append(")");
+            return bld.toString();
+        }
+    }
+
     private final int controllerId;
     private final int controllerEpoch;
     private final Map<TopicPartition, PartitionState> partitionStates;
     private final Set<Node> liveLeaders;
 
-    public LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates,
-                               Set<Node> liveLeaders) {
-        super(new Struct(CURRENT_SCHEMA));
+    private LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates,
+                               Set<Node> liveLeaders, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LEADER_AND_ISR.id, version)),
+                version);
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
         struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
 
@@ -100,8 +132,8 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         this.liveLeaders = liveLeaders;
     }
 
-    public LeaderAndIsrRequest(Struct struct) {
-        super(struct);
+    public LeaderAndIsrRequest(Struct struct, short versionId) {
+        super(struct, versionId);
 
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) {
@@ -145,12 +177,13 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         Map<TopicPartition, Short> responses = new HashMap<>(partitionStates.size());
         for (TopicPartition partition : partitionStates.keySet()) {
             responses.put(partition, Errors.forException(e).code());
         }
 
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
@@ -177,11 +210,11 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     }
 
     public static LeaderAndIsrRequest parse(ByteBuffer buffer, int versionId) {
-        return new LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, versionId, buffer));
+        return new LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static LeaderAndIsrRequest parse(ByteBuffer buffer) {
-        return new LeaderAndIsrRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LEADER_AND_ISR.id));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 6a3f8a6..573ebc8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -16,34 +16,59 @@ import java.nio.ByteBuffer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 public class LeaveGroupRequest extends AbstractRequest {
-
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEAVE_GROUP.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String MEMBER_ID_KEY_NAME = "member_id";
 
+    public static class Builder extends AbstractRequest.Builder<LeaveGroupRequest> {
+        private final String groupId;
+        private final String memberId;
+
+        public Builder(String groupId, String memberId) {
+            super(ApiKeys.LEAVE_GROUP);
+            this.groupId = groupId;
+            this.memberId = memberId;
+        }
+
+        @Override
+        public LeaveGroupRequest build() {
+            return new LeaveGroupRequest(groupId, memberId, version());
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=LeaveGroupRequest").
+                append(", groupId=").append(groupId).
+                append(", memberId=").append(memberId).
+                append(")");
+            return bld.toString();
+        }
+    }
+
     private final String groupId;
     private final String memberId;
 
-    public LeaveGroupRequest(String groupId, String memberId) {
-        super(new Struct(CURRENT_SCHEMA));
+    private LeaveGroupRequest(String groupId, String memberId, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LEAVE_GROUP.id, version)),
+                version);
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(MEMBER_ID_KEY_NAME, memberId);
         this.groupId = groupId;
         this.memberId = memberId;
     }
 
-    public LeaveGroupRequest(Struct struct) {
-        super(struct);
+    public LeaveGroupRequest(Struct struct, short version) {
+        super(struct, version);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         memberId = struct.getString(MEMBER_ID_KEY_NAME);
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 return new LeaveGroupResponse(Errors.forException(e).code());
@@ -62,10 +87,11 @@ public class LeaveGroupRequest extends AbstractRequest {
     }
 
     public static LeaveGroupRequest parse(ByteBuffer buffer, int versionId) {
-        return new LeaveGroupRequest(ProtoUtils.parseRequest(ApiKeys.LEAVE_GROUP.id, versionId, buffer));
+        return new LeaveGroupRequest(ProtoUtils.parseRequest(ApiKeys.LEAVE_GROUP.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static LeaveGroupRequest parse(ByteBuffer buffer) {
-        return new LeaveGroupRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LEAVE_GROUP.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 3fd3b81..8d0a1af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -15,26 +15,40 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
 public class ListGroupsRequest extends AbstractRequest {
+    public static class Builder extends AbstractRequest.Builder<ListGroupsRequest> {
+        public Builder() {
+            super(ApiKeys.LIST_GROUPS);
+        }
 
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_GROUPS.id);
+        @Override
+        public ListGroupsRequest build() {
+            return new ListGroupsRequest(version());
+        }
 
-    public ListGroupsRequest() {
-        super(new Struct(CURRENT_SCHEMA));
+        @Override
+        public String toString() {
+            return "(type=ListGroupsRequest)";
+        }
     }
 
-    public ListGroupsRequest(Struct struct) {
-        super(struct);
+    public ListGroupsRequest(short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_GROUPS.id, version)),
+                version);
+    }
+
+    public ListGroupsRequest(Struct struct, short versionId) {
+        super(struct, versionId);
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
         switch (versionId) {
             case 0:
                 short errorCode = Errors.forException(e).code();
@@ -46,12 +60,11 @@ public class ListGroupsRequest extends AbstractRequest {
     }
 
     public static ListGroupsRequest parse(ByteBuffer buffer, int versionId) {
-        return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer));
+        return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static ListGroupsRequest parse(ByteBuffer buffer) {
-        return new ListGroupsRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id));
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index c1db82d..3361383 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -17,12 +17,14 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ObsoleteBrokerException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -34,14 +36,12 @@ import java.util.Map;
 import java.util.Set;
 
 public class ListOffsetRequest extends AbstractRequest {
-
     public static final long EARLIEST_TIMESTAMP = -2L;
     public static final long LATEST_TIMESTAMP = -1L;
 
     public static final int CONSUMER_REPLICA_ID = -1;
     public static final int DEBUGGING_REPLICA_ID = -2;
 
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
     private static final String REPLICA_ID_KEY_NAME = "replica_id";
     private static final String TOPICS_KEY_NAME = "topics";
 
@@ -59,6 +59,89 @@ public class ListOffsetRequest extends AbstractRequest {
     private final Map<TopicPartition, Long> partitionTimestamps;
     private final Set<TopicPartition> duplicatePartitions;
 
+    public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> {
+        private final int replicaId;
+        private Map<TopicPartition, PartitionData> offsetData = null;
+        private Map<TopicPartition, Long> partitionTimestamps = null;
+        private short minVersion = (short) 0;
+
+        public Builder() {
+            this(CONSUMER_REPLICA_ID);
+        }
+
+        public Builder(int replicaId) {
+            super(ApiKeys.LIST_OFFSETS);
+            this.replicaId = replicaId;
+        }
+
+        public Builder setOffsetData(Map<TopicPartition, PartitionData> offsetData) {
+            this.offsetData = offsetData;
+            return this;
+        }
+
+        public Builder setTargetTimes(Map<TopicPartition, Long> partitionTimestamps) {
+            this.partitionTimestamps = partitionTimestamps;
+            return this;
+        }
+
+        @Override
+        public ListOffsetRequest build() {
+            short version = version();
+            if (version < minVersion) {
+                throw new ObsoleteBrokerException("The broker is too old to send this request.");
+            }
+            if (version == 0) {
+                if (offsetData == null) {
+                    if (partitionTimestamps == null) {
+                        throw new RuntimeException("Must set partitionTimestamps or offsetData when creating a v0 " +
+                            "ListOffsetRequest");
+                    } else {
+                        offsetData = new HashMap<>();
+                        for (Map.Entry<TopicPartition, Long> entry: partitionTimestamps.entrySet()) {
+                            offsetData.put(entry.getKey(),
+                                    new PartitionData(entry.getValue(), 1));
+                        }
+                        this.partitionTimestamps = null;
+                    }
+                }
+            } else {
+                if (offsetData != null) {
+                    throw new UnsupportedVersionException("Cannot create a v" + version + " ListOffsetRequest with v0 " +
+                        "PartitionData.");
+                } else if (partitionTimestamps == null) {
+                    throw new RuntimeException("Must set partitionTimestamps when creating a v" +
+                            version + " ListOffsetRequest");
+                }
+            }
+            Map<TopicPartition, ?> m = (version == 0) ?  offsetData : partitionTimestamps;
+            return new ListOffsetRequest(replicaId, m, version);
+        }
+
+        /**
+         * Set the minimum version which we will produce for this request.
+         */
+        public Builder setMinVersion(short minVersion) {
+            this.minVersion = minVersion;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=ListOffsetRequest")
+               .append(", replicaId=").append(replicaId);
+            if (offsetData != null) {
+                bld.append(", offsetData=").append(Utils.mkString(offsetData));
+            }
+            if (partitionTimestamps != null) {
+                bld.append(", partitionTimestamps=").append(Utils.mkString(partitionTimestamps));
+            }
+            bld.append(", minVersion=").append(minVersion);
+            bld.append(")");
+            return bld.toString();
+        }
+    }
+
     /**
      * This class is only used by ListOffsetRequest v0 which has been deprecated.
      */
@@ -71,37 +154,23 @@ public class ListOffsetRequest extends AbstractRequest {
             this.timestamp = timestamp;
             this.maxNumOffsets = maxNumOffsets;
         }
-    }
-
-    /**
-     * Constructor for ListOffsetRequest v0
-     */
-    @Deprecated
-    public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
-        this(CONSUMER_REPLICA_ID, offsetData, 0);
-    }
 
-    /**
-     * Constructor for ListOffsetRequest v0
-     */
-    @Deprecated
-    public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
-        this(replicaId, offsetData, 0);
-    }
-
-    /**
-     * Constructor for ListOffsetRequest v1.
-     */
-    public ListOffsetRequest(Map<TopicPartition, Long> targetTimes, int replicaId) {
-        this(replicaId, targetTimes, 1);
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("{timestamp: ").append(timestamp).
+                append(", maxNumOffsets: ").append(maxNumOffsets).
+                append("}");
+            return bld.toString();
+        }
     }
 
     /**
      * Private constructor with a specified version.
      */
     @SuppressWarnings("unchecked")
-    private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, int version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_OFFSETS.id, version)));
+    private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_OFFSETS.id, version)), version);
         Map<String, Map<Integer, Object>> topicsData =
                 CollectionUtils.groupDataByTopic((Map<TopicPartition, Object>) targetTimes);
 
@@ -110,7 +179,7 @@ public class ListOffsetRequest extends AbstractRequest {
         for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
+            List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, Object> partitionEntry : topicEntry.getValue().entrySet()) {
                 if (version == 0) {
                     PartitionData offsetPartitionData = (PartitionData) partitionEntry.getValue();
@@ -137,9 +206,9 @@ public class ListOffsetRequest extends AbstractRequest {
         this.duplicatePartitions = Collections.emptySet();
     }
 
-    public ListOffsetRequest(Struct struct) {
-        super(struct);
-        Set<TopicPartition> duplicatePatitions = new HashSet<>();
+    public ListOffsetRequest(Struct struct, short versionId) {
+        super(struct, versionId);
+        Set<TopicPartition> duplicatePartitions = new HashSet<>();
         replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
         offsetData = new HashMap<>();
         partitionTimestamps = new HashMap<>();
@@ -157,18 +226,19 @@ public class ListOffsetRequest extends AbstractRequest {
                     offsetData.put(tp, partitionData);
                 } else {
                     if (partitionTimestamps.put(tp, timestamp) != null)
-                        duplicatePatitions.add(tp);
+                        duplicatePartitions.add(tp);
                 }
             }
         }
-        this.duplicatePartitions = duplicatePatitions;
+        this.duplicatePartitions = duplicatePartitions;
     }
 
     @Override
     @SuppressWarnings("deprecation")
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
 
+        short versionId = version();
         if (versionId == 0) {
             for (Map.Entry<TopicPartition, PartitionData> entry : offsetData.entrySet()) {
                 ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>());
@@ -209,10 +279,11 @@ public class ListOffsetRequest extends AbstractRequest {
     }
 
     public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
-        return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
+        return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static ListOffsetRequest parse(ByteBuffer buffer) {
-        return new ListOffsetRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 0b257bc..2eddf1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -89,6 +90,23 @@ public class ListOffsetResponse extends AbstractResponse {
             this.offset = offset;
             this.offsets = null;
         }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("PartitionData{").
+                append("errorCode: ").append((int) errorCode).
+                append(", timestamp: ").append(timestamp).
+                append(", offset: ").append(offset).
+                append(", offsets: ");
+            if (offsets == null) {
+                bld.append(offsets);
+            } else {
+                bld.append("[").append(Utils.join(this.offsets, ",")).append("]");
+            }
+            bld.append("}");
+            return bld.toString();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index f7d8f8b..16af1b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -13,11 +13,12 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -26,23 +27,70 @@ import java.util.List;
 
 public class MetadataRequest extends AbstractRequest {
 
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
-    private static final String TOPICS_KEY_NAME = "topics";
+    public static class Builder extends AbstractRequest.Builder<MetadataRequest> {
+        private static final List<String> ALL_TOPICS = null;
+
+        // The list of topics, or null if we want to request metadata about all topics.
+        private final List<String> topics;
+
+        public static Builder allTopics() {
+            return new Builder(ALL_TOPICS);
+        }
+
+        public Builder(List<String> topics) {
+            super(ApiKeys.METADATA);
+            this.topics = topics;
+        }
+
+        public List<String> topics() {
+            return this.topics;
+        }
+
+        public boolean isAllTopics() {
+            return this.topics == ALL_TOPICS;
+        }
 
-    private static final MetadataRequest ALL_TOPICS_REQUEST = new MetadataRequest((List<String>) null); // Unusual cast to work around constructor ambiguity
+        @Override
+        public MetadataRequest build() {
+            short version = version();
+            if (version < 1) {
+                throw new UnsupportedVersionException("MetadataRequest " +
+                        "versions older than 1 are not supported.");
+            }
+            return new MetadataRequest(this.topics, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=MetadataRequest").
+                append(", topics=");
+            if (topics == null) {
+                bld.append("<ALL>");
+            } else {
+                bld.append(Utils.join(topics, ","));
+            }
+            bld.append(")");
+            return bld.toString();
+        }
+    }
+
+    private static final String TOPICS_KEY_NAME = "topics";
 
     private final List<String> topics;
 
-    public static MetadataRequest allTopics() {
-        return ALL_TOPICS_REQUEST;
+    public static MetadataRequest allTopics(short version) {
+        return new MetadataRequest.Builder(null).setVersion(version).build();
     }
 
     /**
      * In v0 null is not allowed and and empty list indicates requesting all topics.
+     * Note: modern clients do not support sending v0 requests.
      * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics.
      */
-    public MetadataRequest(List<String> topics) {
-        super(new Struct(CURRENT_SCHEMA));
+    public MetadataRequest(List<String> topics, short version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.METADATA.id, version)),
+                version);
         if (topics == null)
             struct.set(TOPICS_KEY_NAME, null);
         else
@@ -50,8 +98,8 @@ public class MetadataRequest extends AbstractRequest {
         this.topics = topics;
     }
 
-    public MetadataRequest(Struct struct) {
-        super(struct);
+    public MetadataRequest(Struct struct, short version) {
+        super(struct, version);
         Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
         if (topicArray != null) {
             topics = new ArrayList<>();
@@ -64,7 +112,7 @@ public class MetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(Throwable e) {
         List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
         Errors error = Errors.forException(e);
         List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
@@ -74,6 +122,7 @@ public class MetadataRequest extends AbstractRequest {
                 topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, false, partitions));
         }
 
+        short versionId = version();
         switch (versionId) {
             case 0:
             case 1:
@@ -94,10 +143,11 @@ public class MetadataRequest extends AbstractRequest {
     }
 
     public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer));
+        return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer),
+                (short) versionId);
     }
 
     public static MetadataRequest parse(ByteBuffer buffer) {
-        return new MetadataRequest(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.METADATA.id));
     }
 }


Mime
View raw message