kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [4/4] kafka git commit: KAFKA-4954; Request handler utilization quotas
Date Mon, 01 May 2017 16:13:41 GMT
KAFKA-4954; Request handler utilization quotas

See KIP-124 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+Request+rate+quotas) for details

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #2744 from rajinisivaram/KAFKA-4954


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0104b657
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0104b657
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0104b657

Branch: refs/heads/trunk
Commit: 0104b657a154fb15e716d872a0e6084f9da650bf
Parents: 6185bc0
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Mon May 1 09:13:31 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon May 1 09:13:31 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  30 +-
 .../org/apache/kafka/common/metrics/Sensor.java |   9 +-
 .../kafka/common/network/KafkaChannel.java      |  22 +-
 .../apache/kafka/common/network/Selector.java   |  23 +
 .../apache/kafka/common/protocol/ApiKeys.java   |  86 +--
 .../apache/kafka/common/protocol/Protocol.java  | 212 ++++++--
 .../kafka/common/requests/AbstractRequest.java  |   9 +-
 .../kafka/common/requests/AbstractResponse.java |   1 +
 .../common/requests/AddOffsetsToTxnRequest.java |   4 +-
 .../requests/AddOffsetsToTxnResponse.java       |  11 +-
 .../requests/AddPartitionsToTxnRequest.java     |   4 +-
 .../requests/AddPartitionsToTxnResponse.java    |  11 +-
 .../common/requests/ApiVersionsRequest.java     |   8 +-
 .../common/requests/ApiVersionsResponse.java    |  31 +-
 .../requests/ControlledShutdownRequest.java     |   2 +-
 .../common/requests/CreateTopicsRequest.java    |   4 +-
 .../common/requests/CreateTopicsResponse.java   |  14 +
 .../common/requests/DeleteRecordsRequest.java   |   4 +-
 .../common/requests/DeleteRecordsResponse.java  |  12 +-
 .../common/requests/DeleteTopicsRequest.java    |   4 +-
 .../common/requests/DeleteTopicsResponse.java   |  14 +
 .../common/requests/DescribeGroupsRequest.java  |   4 +-
 .../common/requests/DescribeGroupsResponse.java |  20 +-
 .../kafka/common/requests/EndTxnRequest.java    |   4 +-
 .../kafka/common/requests/EndTxnResponse.java   |  11 +-
 .../kafka/common/requests/FetchRequest.java     |   4 +-
 .../common/requests/FindCoordinatorRequest.java |   5 +-
 .../requests/FindCoordinatorResponse.java       |  14 +
 .../kafka/common/requests/HeartbeatRequest.java |   4 +-
 .../common/requests/HeartbeatResponse.java      |  14 +
 .../kafka/common/requests/InitPidRequest.java   |   4 +-
 .../kafka/common/requests/InitPidResponse.java  |  15 +-
 .../kafka/common/requests/JoinGroupRequest.java |  11 +-
 .../common/requests/JoinGroupResponse.java      |  20 +
 .../common/requests/LeaderAndIsrRequest.java    |   2 +-
 .../common/requests/LeaveGroupRequest.java      |   4 +-
 .../common/requests/LeaveGroupResponse.java     |  14 +
 .../common/requests/ListGroupsRequest.java      |   4 +-
 .../common/requests/ListGroupsResponse.java     |  20 +-
 .../common/requests/ListOffsetRequest.java      |   6 +-
 .../common/requests/ListOffsetResponse.java     |  16 +-
 .../kafka/common/requests/MetadataRequest.java  |   4 +-
 .../kafka/common/requests/MetadataResponse.java |  14 +
 .../common/requests/OffsetCommitRequest.java    |   5 +-
 .../common/requests/OffsetCommitResponse.java   |  14 +
 .../common/requests/OffsetFetchRequest.java     |  10 +-
 .../common/requests/OffsetFetchResponse.java    |  22 +-
 .../requests/OffsetsForLeaderEpochRequest.java  |   2 +-
 .../kafka/common/requests/ProduceRequest.java   |   4 +-
 .../kafka/common/requests/ProduceResponse.java  |   1 -
 .../common/requests/SaslHandshakeRequest.java   |  25 +-
 .../common/requests/StopReplicaRequest.java     |   2 +-
 .../kafka/common/requests/SyncGroupRequest.java |   7 +-
 .../common/requests/SyncGroupResponse.java      |  14 +
 .../common/requests/TxnOffsetCommitRequest.java |   4 +-
 .../requests/TxnOffsetCommitResponse.java       |  11 +-
 .../common/requests/UpdateMetadataRequest.java  |   2 +-
 .../common/requests/WriteTxnMarkersRequest.java |   2 +-
 .../apache/kafka/clients/NetworkClientTest.java |   3 +-
 .../clients/producer/internals/SenderTest.java  |   2 +-
 .../internals/TransactionManagerTest.java       |  10 +-
 .../common/network/SslTransportLayerTest.java   |  39 ++
 .../kafka/common/protocol/ApiKeysTest.java      |  31 ++
 .../common/requests/RequestResponseTest.java    |  10 +-
 .../scala/kafka/network/RequestChannel.scala    |  82 ++-
 .../main/scala/kafka/network/SocketServer.scala |  31 +-
 .../scala/kafka/server/ClientQuotaManager.scala | 101 ++--
 .../server/ClientRequestQuotaManager.scala      |  54 ++
 .../main/scala/kafka/server/ConfigHandler.scala |   6 +
 .../main/scala/kafka/server/DynamicConfig.scala |   5 +
 .../src/main/scala/kafka/server/KafkaApis.scala | 539 +++++++++++--------
 .../kafka/server/KafkaRequestHandler.scala      |   6 +-
 .../main/scala/kafka/server/QuotaFactory.scala  |  12 +-
 .../integration/kafka/api/AdminClientTest.scala |   2 +-
 .../integration/kafka/api/BaseQuotaTest.scala   |  65 ++-
 .../kafka/api/ClientIdQuotaTest.scala           |   5 +-
 .../kafka/api/UserClientIdQuotaTest.scala       |   8 +-
 .../integration/kafka/api/UserQuotaTest.scala   |   8 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |   6 +-
 .../unit/kafka/server/BaseRequestTest.scala     |   2 +-
 .../kafka/server/ClientQuotaManagerTest.scala   |  62 +++
 .../unit/kafka/server/RequestQuotaTest.scala    | 420 +++++++++++++++
 82 files changed, 1879 insertions(+), 484 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 7bd0311..df9e2fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -42,13 +42,12 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 
 /**
  * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -100,7 +99,7 @@ public class NetworkClient implements KafkaClient {
 
     private final ApiVersions apiVersions;
 
-    private final Set<String> nodesNeedingApiVersionsFetch = new HashSet<>();
+    private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>();
 
     private final List<ClientResponse> abortedSends = new LinkedList<>();
 
@@ -471,7 +470,7 @@ public class NetworkClient implements KafkaClient {
         ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
         // Always expect the response version id to be the same as the request version id
         ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
-        Struct responseBody = apiKey.responseSchema(requestHeader.apiVersion()).read(responseBuffer);
+        Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer);
         correlate(requestHeader, responseHeader);
         return AbstractResponse.getResponse(apiKey, responseBody);
     }
@@ -564,10 +563,14 @@ public class NetworkClient implements KafkaClient {
                                            InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
         final String node = req.destination;
         if (apiVersionsResponse.error() != Errors.NONE) {
-            log.warn("Node {} got error {} when making an ApiVersionsRequest.  Disconnecting.",
-                    node, apiVersionsResponse.error());
-            this.selector.close(node);
-            processDisconnection(responses, node, now);
+            if (req.request.version() == 0 || apiVersionsResponse.error() != Errors.UNSUPPORTED_VERSION) {
+                log.warn("Node {} got error {} when making an ApiVersionsRequest.  Disconnecting.",
+                        node, apiVersionsResponse.error());
+                this.selector.close(node);
+                processDisconnection(responses, node, now);
+            } else {
+                nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder((short) 0));
+            }
             return;
         }
         NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions());
@@ -605,7 +608,7 @@ public class NetworkClient implements KafkaClient {
             // connection.
             if (discoverBrokerVersions) {
                 this.connectionStates.checkingApiVersions(node);
-                nodesNeedingApiVersionsFetch.add(node);
+                nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
                 log.debug("Completed connection to node {}.  Fetching API versions.", node);
             } else {
                 this.connectionStates.ready(node);
@@ -615,13 +618,14 @@ public class NetworkClient implements KafkaClient {
     }
 
     private void handleInitiateApiVersionRequests(long now) {
-        Iterator<String> iter = nodesNeedingApiVersionsFetch.iterator();
+        Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator();
         while (iter.hasNext()) {
-            String node = iter.next();
+            Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
+            String node = entry.getKey();
             if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
                 log.debug("Initiating API versions fetch from node {}.", node);
-                ApiVersionsRequest.Builder apiVersionRequest = new ApiVersionsRequest.Builder();
-                ClientRequest clientRequest = newClientRequest(node, apiVersionRequest, now, true);
+                ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
+                ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
                 doSend(clientRequest, true, now);
                 iter.remove();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 5ca9fce..ae331e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -168,16 +168,21 @@ public final class Sensor {
      *         bound
      */
     public void record(double value, long timeMs) {
+        record(value, timeMs, true);
+    }
+
+    public void record(double value, long timeMs, boolean checkQuotas) {
         if (shouldRecord()) {
             this.lastRecordTime = timeMs;
             synchronized (this) {
                 // increment all the stats
                 for (Stat stat : this.stats)
                     stat.record(config, value, timeMs);
-                checkQuotas(timeMs);
+                if (checkQuotas)
+                    checkQuotas(timeMs);
             }
             for (Sensor parent : parents)
-                parent.record(value, timeMs);
+                parent.record(value, timeMs, checkQuotas);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index f1bf86c..ea03ff0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -31,6 +31,9 @@ public class KafkaChannel {
     private final String id;
     private final TransportLayer transportLayer;
     private final Authenticator authenticator;
+    // Tracks accumulated network thread time. This is updated on the network thread.
+    // The values are read and reset after each response is sent.
+    private long networkThreadTimeNanos;
     private final int maxReceiveSize;
     private NetworkReceive receive;
     private Send send;
@@ -43,6 +46,7 @@ public class KafkaChannel {
         this.id = id;
         this.transportLayer = transportLayer;
         this.authenticator = authenticator;
+        this.networkThreadTimeNanos = 0L;
         this.maxReceiveSize = maxReceiveSize;
         this.disconnected = false;
         this.muted = false;
@@ -164,6 +168,23 @@ public class KafkaChannel {
         return result;
     }
 
+    /**
+     * Accumulates network thread time for this channel.
+     */
+    public void addNetworkThreadTimeNanos(long nanos) {
+        networkThreadTimeNanos += nanos;
+    }
+
+    /**
+     * Returns accumulated network thread time for this channel and resets
+     * the value to zero.
+     */
+    public long getAndResetNetworkThreadTimeNanos() {
+        long current = networkThreadTimeNanos;
+        networkThreadTimeNanos = 0;
+        return current;
+    }
+
     private long receive(NetworkReceive receive) throws IOException {
         return receive.readFrom(transportLayer);
     }
@@ -175,5 +196,4 @@ public class KafkaChannel {
 
         return send.completed();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index fd3ab47..8dd3ad6 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -101,6 +101,7 @@ public class Selector implements Selectable {
     private final ChannelBuilder channelBuilder;
     private final int maxReceiveSize;
     private final boolean metricsPerConnection;
+    private final boolean recordTimePerConnection;
     private final IdleExpiryManager idleExpiryManager;
 
     /**
@@ -122,6 +123,7 @@ public class Selector implements Selectable {
                     String metricGrpPrefix,
                     Map<String, String> metricTags,
                     boolean metricsPerConnection,
+                    boolean recordTimePerConnection,
                     ChannelBuilder channelBuilder) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
@@ -144,9 +146,21 @@ public class Selector implements Selectable {
         this.sensors = new SelectorMetrics(metrics);
         this.channelBuilder = channelBuilder;
         this.metricsPerConnection = metricsPerConnection;
+        this.recordTimePerConnection = recordTimePerConnection;
         this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
     }
 
+    public Selector(int maxReceiveSize,
+            long connectionMaxIdleMs,
+            Metrics metrics,
+            Time time,
+            String metricGrpPrefix,
+            Map<String, String> metricTags,
+            boolean metricsPerConnection,
+            ChannelBuilder channelBuilder) {
+        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder);
+    }
+
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
         this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
     }
@@ -326,6 +340,7 @@ public class Selector implements Selectable {
             SelectionKey key = iterator.next();
             iterator.remove();
             KafkaChannel channel = channel(key);
+            long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
 
             // register all per-connection metrics at once
             sensors.maybeRegisterConnectionMetrics(channel.id());
@@ -380,10 +395,18 @@ public class Selector implements Selectable {
                 else
                     log.warn("Unexpected error from {}; closing connection", desc, e);
                 close(channel, true);
+            } finally {
+                maybeRecordTimePerConnection(channel, channelStartTimeNanos);
             }
         }
     }
 
+    // Record time spent in pollSelectionKeys for channel (moved into a method to keep checkstyle happy)
+    private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) {
+        if (recordTimePerConnection)
+            channel.addNetworkThreadTimeNanos(time.nanoseconds() - startTimeNanos);
+    }
+
     @Override
     public List<Send> completedSends() {
         return this.completedSends;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 9e7ce1d..b98a33e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -25,35 +26,43 @@ import java.nio.ByteBuffer;
  * Identifiers for all the Kafka APIs
  */
 public enum ApiKeys {
-    PRODUCE(0, "Produce"),
-    FETCH(1, "Fetch"),
-    LIST_OFFSETS(2, "Offsets"),
-    METADATA(3, "Metadata"),
-    LEADER_AND_ISR(4, "LeaderAndIsr"),
-    STOP_REPLICA(5, "StopReplica"),
-    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
-    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
-    OFFSET_COMMIT(8, "OffsetCommit"),
-    OFFSET_FETCH(9, "OffsetFetch"),
-    FIND_COORDINATOR(10, "FindCoordinator"),
-    JOIN_GROUP(11, "JoinGroup"),
-    HEARTBEAT(12, "Heartbeat"),
-    LEAVE_GROUP(13, "LeaveGroup"),
-    SYNC_GROUP(14, "SyncGroup"),
-    DESCRIBE_GROUPS(15, "DescribeGroups"),
-    LIST_GROUPS(16, "ListGroups"),
-    SASL_HANDSHAKE(17, "SaslHandshake"),
-    API_VERSIONS(18, "ApiVersions"),
-    CREATE_TOPICS(19, "CreateTopics"),
-    DELETE_TOPICS(20, "DeleteTopics"),
-    DELETE_RECORDS(21, "DeleteRecords"),
-    INIT_PRODUCER_ID(22, "InitProducerId"),
-    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch"),
-    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn"),
-    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn"),
-    END_TXN(26, "EndTxn"),
-    WRITE_TXN_MARKERS(27, "WriteTxnMarkers"),
-    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit");
+    PRODUCE(0, "Produce", false),
+    FETCH(1, "Fetch", false),
+    LIST_OFFSETS(2, "Offsets", false),
+    METADATA(3, "Metadata", false),
+    LEADER_AND_ISR(4, "LeaderAndIsr", true),
+    STOP_REPLICA(5, "StopReplica", true),
+    UPDATE_METADATA_KEY(6, "UpdateMetadata", true),
+    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown", true),
+    OFFSET_COMMIT(8, "OffsetCommit", false),
+    OFFSET_FETCH(9, "OffsetFetch", false),
+    FIND_COORDINATOR(10, "FindCoordinator", false),
+    JOIN_GROUP(11, "JoinGroup", false),
+    HEARTBEAT(12, "Heartbeat", false),
+    LEAVE_GROUP(13, "LeaveGroup", false),
+    SYNC_GROUP(14, "SyncGroup", false),
+    DESCRIBE_GROUPS(15, "DescribeGroups", false),
+    LIST_GROUPS(16, "ListGroups", false),
+    SASL_HANDSHAKE(17, "SaslHandshake", false),
+    API_VERSIONS(18, "ApiVersions", false) {
+        @Override
+        public Struct parseResponse(short version, ByteBuffer buffer) {
+            // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest
+            // using a version higher than that supported by the broker, a version 0 response is sent
+            // to the client indicating UNSUPPORTED_VERSION.
+            return parseResponse(version, buffer, (short) 0);
+        }
+    },
+    CREATE_TOPICS(19, "CreateTopics", false),
+    DELETE_TOPICS(20, "DeleteTopics", false),
+    DELETE_RECORDS(21, "DeleteRecords", false),
+    INIT_PRODUCER_ID(22, "InitProducerId", false),
+    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true),
+    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false),
+    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false),
+    END_TXN(26, "EndTxn", false),
+    WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true),
+    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false);
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
@@ -76,11 +85,15 @@ public enum ApiKeys {
     /** an english description of the api--this is for debugging and can change */
     public final String name;
 
-    ApiKeys(int id, String name) {
+    /** indicates if this is a ClusterAction request used only by brokers */
+    public final boolean clusterAction;
+
+    ApiKeys(int id, String name, boolean clusterAction) {
         if (id < 0)
             throw new IllegalArgumentException("id must not be negative, id: " + id);
         this.id = (short) id;
         this.name = name;
+        this.clusterAction = clusterAction;
     }
 
     public static ApiKeys forId(int id) {
@@ -122,6 +135,19 @@ public enum ApiKeys {
         return responseSchema(version).read(buffer);
     }
 
+    protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
+        int bufferPosition = buffer.position();
+        try {
+            return responseSchema(version).read(buffer);
+        } catch (SchemaException e) {
+            if (version != fallbackVersion) {
+                buffer.position(bufferPosition);
+                return responseSchema(fallbackVersion).read(buffer);
+            } else
+                throw e;
+        }
+    }
+
     private Schema schemaFor(Schema[][] schemas, short version) {
         if (id > schemas.length)
             throw new IllegalArgumentException("No schema available for API key " + this);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 54f533e..3da2b3f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -65,6 +65,9 @@ public class Protocol {
     /* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */
     public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1;
 
+    /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
+    public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
+
     public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
                                                    new Field("host", STRING, "The hostname of the broker."),
                                                    new Field("port", INT32,
@@ -129,9 +132,19 @@ public class Protocol {
                                                                      "The broker id of the controller broker."),
                                                                  new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
 
+    public static final Schema METADATA_RESPONSE_V3 = new Schema(
+         newThrottleTimeField(),
+         new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
+            "Host and port information for all brokers."),
+         new Field("cluster_id", NULLABLE_STRING,
+             "The cluster id that this broker belongs to."),
+         new Field("controller_id", INT32,
+             "The broker id of the controller broker."),
+         new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
+
 
-    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2};
-    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2};
+    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
+    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
 
     /* Produce api */
 
@@ -190,11 +203,7 @@ public class Protocol {
                                                                                                                                             INT16),
                                                                                                                                   new Field("base_offset",
                                                                                                                                             INT64))))))),
-                                                                new Field("throttle_time_ms",
-                                                                          INT32,
-                                                                          "Duration in milliseconds for which the request was throttled" +
-                                                                              " due to quota violation. (Zero if the request did not violate any quota.)",
-                                                                          0));
+                                                                newThrottleTimeField());
     /**
      * PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status.
      * The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create
@@ -215,11 +224,7 @@ public class Protocol {
                                                                                                                             "If CreateTime is used for the topic, the timestamp will be -1. " +
                                                                                                                             "If LogAppendTime is used for the topic, the timestamp will be " +
                                                                                                                             "the broker local time when the messages are appended."))))))),
-                                                                new Field("throttle_time_ms",
-                                                                          INT32,
-                                                                          "Duration in milliseconds for which the request was throttled" +
-                                                                              " due to quota violation. (Zero if the request did not violate any quota.)",
-                                                                          0));
+                                                                newThrottleTimeField());
     public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
 
     public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
@@ -316,6 +321,9 @@ public class Protocol {
                                                                                new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
                                                                                "Topics to commit offsets."));
 
+    /* v3 request is same as v2. Throttle time has been added to response */
+    public static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
+
     public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                           INT32,
                                                                                           "Topic partition id."),
@@ -329,13 +337,18 @@ public class Protocol {
     public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
                                                                                 new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
-    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
+    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
 
     /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
     public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
     public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
 
-    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2};
+    public static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema(
+            newThrottleTimeField(),
+            new Field("responses",
+                       new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
 
     /* Offset fetch api */
 
@@ -401,8 +414,17 @@ public class Protocol {
                                                                      new Field("error_code",
                                                                                INT16));
 
-    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2};
-    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2};
+    /* v3 request is the same as v2. Throttle time has been added to v3 response */
+    public static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2;
+    public static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema(
+            newThrottleTimeField(),
+            new Field("responses",
+                    new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
+            new Field("error_code",
+                    INT16));
+
+    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
+    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
 
     /* List offset api */
     public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -445,6 +467,9 @@ public class Protocol {
                                                                              new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
                                                                              "Topics to list offsets."));
 
+    /* v2 request is the same as v1. Throttle time has been added to response */
+    public static final Schema LIST_OFFSET_REQUEST_V2 = LIST_OFFSET_REQUEST_V1;
+
     public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                         INT32,
                                                                                         "Topic partition id."),
@@ -477,9 +502,13 @@ public class Protocol {
 
     public static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(new Field("responses",
                                                                               new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
+    public static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema(
+            newThrottleTimeField(),
+            new Field("responses",
+                    new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
 
-    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1};
-    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1};
+    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
+    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
 
     /* Fetch api */
     public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -630,11 +659,7 @@ public class Protocol {
     public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
                                                                         new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
 
-    public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
-                                                                        INT32,
-                                                                        "Duration in milliseconds for which the request was throttled" +
-                                                                            " due to quota violation. (Zero if the request did not violate any quota.)",
-                                                                        0),
+    public static final Schema FETCH_RESPONSE_V1 = new Schema(newThrottleTimeField(),
                                                               new Field("responses",
                                                                       new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
     // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1,
@@ -703,19 +728,11 @@ public class Protocol {
             new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
 
     public static final Schema FETCH_RESPONSE_V4 = new Schema(
-            new Field("throttle_time_ms",
-                    INT32,
-                    "Duration in milliseconds for which the request was throttled " +
-                    "due to quota violation (zero if the request did not violate any quota).",
-                    0),
+            newThrottleTimeField(),
             new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
 
     public static final Schema FETCH_RESPONSE_V5 = new Schema(
-            new Field("throttle_time_ms",
-                    INT32,
-                    "Duration in milliseconds for which the request was throttled " +
-                    "due to quota violation (zero if the request did not violate any quota).",
-                    0),
+            newThrottleTimeField(),
             new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
 
     public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
@@ -724,19 +741,29 @@ public class Protocol {
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0;
+
     public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING),
                                                                           new Field("protocol_type", STRING));
     public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
                                                                     new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+    public static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16),
+            new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
 
-    public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0};
-    public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0};
+    public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
+    public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
 
     /* Describe group api */
     public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
                                                                                  new ArrayOf(STRING),
                                                                                  "List of groupIds to request metadata for (an empty groupId array will return empty group metadata)."));
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
+
     public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id",
                                                                                          STRING,
                                                                                          "The memberId assigned by the coordinator"),
@@ -770,9 +797,12 @@ public class Protocol {
                                                                                                  "Current group members (only provided if the group is not Dead)"));
 
     public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
+    public static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
 
-    public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
-    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
+    public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
+    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
 
     /* Find coordinator api */
     public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
@@ -802,6 +832,7 @@ public class Protocol {
                     "Host and port information for the coordinator for a consumer group."));
 
     public static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code", INT16),
             new Field("error_message", NULLABLE_STRING),
             new Field("coordinator",
@@ -870,6 +901,9 @@ public class Protocol {
                                                                             new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
                                                                             "List of protocols that the member supports"));
 
+    /* v2 request is the same as v1. Throttle time has been added to response */
+    public static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
+
     public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING),
                                                                           new Field("member_metadata", BYTES));
 
@@ -890,9 +924,27 @@ public class Protocol {
                                                                              new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
     public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
-
-    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1};
-    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1};
+    public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16),
+            new Field("generation_id",
+                      INT32,
+                      "The generation of the consumer group."),
+            new Field("group_protocol",
+                      STRING,
+                      "The group protocol selected by the coordinator"),
+            new Field("leader_id",
+                      STRING,
+                      "The leader of the group"),
+            new Field("member_id",
+                      STRING,
+                      "The consumer id assigned by the group coordinator."),
+            new Field("members",
+                      new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
+
+
+    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
+    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
 
     /* SyncGroup api */
     public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
@@ -901,10 +953,18 @@ public class Protocol {
                                                                   new Field("generation_id", INT32),
                                                                   new Field("member_id", STRING),
                                                                   new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
+
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
+
     public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
                                                                    new Field("member_assignment", BYTES));
-    public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0};
-    public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0};
+    public static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16),
+            new Field("member_assignment", BYTES));
+    public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
+    public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
 
     /* Heartbeat api */
     public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -915,10 +975,16 @@ public class Protocol {
                                                                            STRING,
                                                                            "The member id assigned by the group coordinator."));
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
+
     public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
+    public static final Schema HEARTBEAT_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16));
 
-    public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
-    public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+    public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
+    public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
 
     /* Leave group api */
     public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -926,10 +992,16 @@ public class Protocol {
                                                                              STRING,
                                                                              "The member id assigned by the group coordinator."));
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0;
+
     public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
+    public static final Schema LEAVE_GROUP_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("error_code", INT16));
 
-    public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0};
-    public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0};
+    public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
+    public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
 
     /* Leader and ISR api */
     public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
@@ -1082,15 +1154,22 @@ public class Protocol {
     /* ApiVersion api */
     public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
 
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    public static final Schema API_VERSIONS_REQUEST_V1 = API_VERSIONS_REQUEST_V0;
+
     public static final Schema API_VERSIONS_V0 = new Schema(new Field("api_key", INT16, "API key."),
                                                            new Field("min_version", INT16, "Minimum supported version."),
                                                            new Field("max_version", INT16, "Maximum supported version."));
 
     public static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
                                                                     new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."));
+    public static final Schema API_VERSIONS_RESPONSE_V1 = new Schema(
+            new Field("error_code", INT16, "Error code."),
+            new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
+            newThrottleTimeField());
 
-    public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0};
-    public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0};
+    public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
+    public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
 
     /* Admin requests common */
     public static final Schema CONFIG_ENTRY = new Schema(new Field("config_key", STRING, "Configuration key name"),
@@ -1154,9 +1233,16 @@ public class Protocol {
             new Field("topic_errors",
                     new ArrayOf(TOPIC_ERROR),
                     "An array of per topic errors."));
+    /* v2 request is the same as v1. Throttle time has been added to the response */
+    public static final Schema CREATE_TOPICS_REQUEST_V2 = CREATE_TOPICS_REQUEST_V1;
+    public static final Schema CREATE_TOPICS_RESPONSE_V2 = new Schema(
+            newThrottleTimeField(),
+            new Field("topic_errors",
+                    new ArrayOf(TOPIC_ERROR),
+                    "An array of per topic errors."));
 
-    public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1};
-    public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1};
+    public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
+    public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
 
     /* DeleteTopic api */
     public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(
@@ -1171,9 +1257,16 @@ public class Protocol {
         new Field("topic_error_codes",
             new ArrayOf(TOPIC_ERROR_CODE),
             "An array of per topic error codes."));
+    /* v1 request is the same as v0. Throttle time has been added to the response */
+    public static final Schema DELETE_TOPICS_REQUEST_V1 = DELETE_TOPICS_REQUEST_V0;
+    public static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema(
+            newThrottleTimeField(),
+            new Field("topic_error_codes",
+                new ArrayOf(TOPIC_ERROR_CODE),
+                "An array of per topic error codes."));
 
-    public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0};
-    public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0};
+    public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1};
+    public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1};
 
     public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
                                                                                 new Field("offset", INT64, "The offset before which the messages will be deleted."));
@@ -1191,7 +1284,9 @@ public class Protocol {
     public static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
                                                                              new Field("partitions", new ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0)));
 
-    public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+    public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
+            new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
 
     public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
     public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
@@ -1207,6 +1302,7 @@ public class Protocol {
     );
 
     public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code",
                     INT16,
                     "An integer error code."),
@@ -1289,6 +1385,7 @@ public class Protocol {
                     "The partitions to add to the transaction.")
     );
     public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code",
                     INT16,
                     "An integer error code.")
@@ -1312,6 +1409,7 @@ public class Protocol {
                     "Consumer group id whose offsets should be included in the transaction.")
     );
     public static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code",
                     INT16,
                     "An integer error code.")
@@ -1336,6 +1434,7 @@ public class Protocol {
     );
 
     public static final Schema END_TXN_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("error_code",
                     INT16,
                     "An integer error code.")
@@ -1425,6 +1524,7 @@ public class Protocol {
     );
 
     public static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
             new Field("topics",
                     new ArrayOf(new Schema(
                             new Field("topic", STRING),
@@ -1535,6 +1635,12 @@ public class Protocol {
         return apiKey < CURR_VERSION.length && apiVersion >= MIN_VERSIONS[apiKey] && apiVersion <= CURR_VERSION[apiKey];
     }
 
+    private static Field newThrottleTimeField() {
+        return new Field("throttle_time_ms", INT32,
+                "Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota.)",
+                0);
+    }
+
     private static String indentString(int size) {
         StringBuilder b = new StringBuilder(size);
         for (int i = 0; i < size; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 07bde63..04f2602 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -95,7 +95,14 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
     /**
      * Get an error response for a request
      */
-    public abstract AbstractResponse getErrorResponse(Throwable e);
+    public AbstractResponse getErrorResponse(Throwable e) {
+        return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e);
+    }
+
+    /**
+     * Get an error response for a request with specified throttle time in the response if applicable
+     */
+    public abstract AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e);
 
     /**
      * Factory method for getting a request object based on ApiKey ID and a buffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 2286783..b76cb21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public abstract class AbstractResponse extends AbstractRequestResponse {
+    public static final int DEFAULT_THROTTLE_TIME = 0;
 
     public Send toSend(String destination, RequestHeader requestHeader) {
         return toSend(destination, requestHeader.apiVersion(), requestHeader.toResponseHeader());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 4245e82..733e806 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -96,8 +96,8 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
     }
 
     @Override
-    public AddOffsetsToTxnResponse getErrorResponse(Throwable e) {
-        return new AddOffsetsToTxnResponse(Errors.forException(e));
+    public AddOffsetsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new AddOffsetsToTxnResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 2426514..8c41ae4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class AddOffsetsToTxnResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Possible error codes:
@@ -35,15 +36,22 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     //   InvalidProducerEpoch
 
     private final Errors error;
+    private final int throttleTimeMs;
 
-    public AddOffsetsToTxnResponse(Errors error) {
+    public AddOffsetsToTxnResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public AddOffsetsToTxnResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -51,6 +59,7 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 9a983d0..5bbea61 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -125,8 +125,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
     }
 
     @Override
-    public AddPartitionsToTxnResponse getErrorResponse(Throwable e) {
-        return new AddPartitionsToTxnResponse(Errors.forException(e));
+    public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new AddPartitionsToTxnResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 0337044..893fcda 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class AddPartitionsToTxnResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Possible error codes:
@@ -35,15 +36,22 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     //   InvalidProducerEpoch
 
     private final Errors error;
+    private final int throttleTimeMs;
 
-    public AddPartitionsToTxnResponse(Errors error) {
+    public AddPartitionsToTxnResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public AddPartitionsToTxnResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -51,6 +59,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 07dd5f5..6f63040 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -30,6 +30,10 @@ public class ApiVersionsRequest extends AbstractRequest {
             super(ApiKeys.API_VERSIONS);
         }
 
+        public Builder(short version) {
+            super(ApiKeys.API_VERSIONS, version);
+        }
+
         @Override
         public ApiVersionsRequest build(short version) {
             return new ApiVersionsRequest(version);
@@ -55,11 +59,13 @@ public class ApiVersionsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+            case 1:
+                return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.API_VERSIONS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 382da89..d434c75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -31,7 +31,8 @@ import java.util.Map;
 
 public class ApiVersionsResponse extends AbstractResponse {
 
-    public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();
+    public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME);
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String API_VERSIONS_KEY_NAME = "api_versions";
     public static final String API_KEY_NAME = "api_key";
@@ -44,6 +45,7 @@ public class ApiVersionsResponse extends AbstractResponse {
      * UNSUPPORTED_VERSION (33)
      */
     private final Errors error;
+    private final int throttleTimeMs;
     private final Map<Short, ApiVersion> apiKeyToApiVersion;
 
     public static final class ApiVersion {
@@ -72,11 +74,17 @@ public class ApiVersionsResponse extends AbstractResponse {
     }
 
     public ApiVersionsResponse(Errors error, List<ApiVersion> apiVersions) {
+        this(DEFAULT_THROTTLE_TIME, error, apiVersions);
+    }
+
+    public ApiVersionsResponse(int throttleTimeMs, Errors error, List<ApiVersion> apiVersions) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions);
     }
 
     public ApiVersionsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         List<ApiVersion> tempApiVersions = new ArrayList<>();
         for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
@@ -92,6 +100,8 @@ public class ApiVersionsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.API_VERSIONS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         List<Struct> apiVersionList = new ArrayList<>();
         for (ApiVersion apiVersion : apiKeyToApiVersion.values()) {
@@ -105,15 +115,26 @@ public class ApiVersionsResponse extends AbstractResponse {
         return struct;
     }
 
+    public static ApiVersionsResponse apiVersionsResponse(short version, int throttleTimeMs) {
+        if (throttleTimeMs == 0 || version == 0)
+            return API_VERSIONS_RESPONSE;
+        else
+            return createApiVersionsResponse(throttleTimeMs);
+    }
+
     /**
      * Returns Errors.UNSUPPORTED_VERSION response with version 0 since we don't support the requested version.
      */
     public static Send unsupportedVersionSend(String destination, RequestHeader requestHeader) {
-        ApiVersionsResponse response = new ApiVersionsResponse(Errors.UNSUPPORTED_VERSION,
+        ApiVersionsResponse response = new ApiVersionsResponse(DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION,
                 Collections.<ApiVersion>emptyList());
         return response.toSend(destination, (short) 0, requestHeader.toResponseHeader());
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Collection<ApiVersion> apiVersions() {
         return apiKeyToApiVersion.values();
     }
@@ -127,15 +148,15 @@ public class ApiVersionsResponse extends AbstractResponse {
     }
 
     public static ApiVersionsResponse parse(ByteBuffer buffer, short version) {
-        return new ApiVersionsResponse(ApiKeys.API_VERSIONS.responseSchema(version).read(buffer));
+        return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer));
     }
 
-    private static ApiVersionsResponse createApiVersionsResponse() {
+    public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs) {
         List<ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
             versionList.add(new ApiVersion(apiKey));
         }
-        return new ApiVersionsResponse(Errors.NONE, versionList);
+        return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList);
     }
 
     private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 4b5ec13..ee41665 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -62,7 +62,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 072dde8..a0626cc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -209,7 +209,7 @@ public class CreateTopicsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<String, CreateTopicsResponse.Error> topicErrors = new HashMap<>();
         for (String topic : topics.keySet()) {
             Errors error = Errors.forException(e);
@@ -223,6 +223,8 @@ public class CreateTopicsRequest extends AbstractRequest {
             case 0:
             case 1:
                 return new CreateTopicsResponse(topicErrors);
+            case 2:
+                return new CreateTopicsResponse(throttleTimeMs, topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                     versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_TOPICS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 33a4b4a..54f9764 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 public class CreateTopicsResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -82,8 +83,14 @@ public class CreateTopicsResponse extends AbstractResponse {
      */
 
     private final Map<String, Error> errors;
+    private final int throttleTimeMs;
 
     public CreateTopicsResponse(Map<String, Error> errors) {
+        this(DEFAULT_THROTTLE_TIME, errors);
+    }
+
+    public CreateTopicsResponse(int throttleTimeMs, Map<String, Error> errors) {
+        this.throttleTimeMs = throttleTimeMs;
         this.errors = errors;
     }
 
@@ -100,12 +107,15 @@ public class CreateTopicsResponse extends AbstractResponse {
             errors.put(topic, new Error(error, errorMessage));
         }
 
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.errors = errors;
     }
 
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.CREATE_TOPICS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
         for (Map.Entry<String, Error> topicError : errors.entrySet()) {
@@ -121,6 +131,10 @@ public class CreateTopicsResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<String, Error> errors() {
         return errors;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
index 96f064c..fcd9836 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
@@ -119,7 +119,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> responseMap = new HashMap<>();
 
         for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
@@ -129,7 +129,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new DeleteRecordsResponse(responseMap);
+                return new DeleteRecordsResponse(throttleTimeMs, responseMap);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                     versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 45b518b..64165eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -33,6 +33,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
     public static final long INVALID_LOW_WATERMARK = -1L;
 
     // request level key names
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level key names
@@ -44,6 +45,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
     private static final String LOW_WATERMARK_KEY_NAME = "low_watermark";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
+    private final int throttleTimeMs;
     private final Map<TopicPartition, PartitionResponse> responses;
 
     /**
@@ -80,6 +82,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
     }
 
     public DeleteRecordsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         responses = new HashMap<>();
         for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicStruct = (Struct) topicStructObj;
@@ -97,13 +100,16 @@ public class DeleteRecordsResponse extends AbstractResponse {
     /**
      * Constructor for version 0.
      */
-    public DeleteRecordsResponse(Map<TopicPartition, PartitionResponse> responses) {
+    public DeleteRecordsResponse(int throttleTimeMs, Map<TopicPartition, PartitionResponse> responses) {
+        this.throttleTimeMs = throttleTimeMs;
         this.responses = responses;
     }
 
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.DELETE_RECORDS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         Map<String, Map<Integer, PartitionResponse>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
         List<Struct> topicStructArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, PartitionResponse>> responsesByTopicEntry : responsesByTopic.entrySet()) {
@@ -125,6 +131,10 @@ public class DeleteRecordsResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<TopicPartition, PartitionResponse> responses() {
         return this.responses;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index ccbe211..2ea8c21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -86,7 +86,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<String, Errors> topicErrors = new HashMap<>();
         for (String topic : topics)
             topicErrors.put(topic, Errors.forException(e));
@@ -94,6 +94,8 @@ public class DeleteTopicsRequest extends AbstractRequest {
         switch (version()) {
             case 0:
                 return new DeleteTopicsResponse(topicErrors);
+            case 1:
+                return new DeleteTopicsResponse(throttleTimeMs, topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                     version(), this.getClass().getSimpleName(), ApiKeys.DELETE_TOPICS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 9d0d0f3..1b80d1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 public class DeleteTopicsResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -40,12 +41,19 @@ public class DeleteTopicsResponse extends AbstractResponse {
      * NOT_CONTROLLER(41)
      */
     private final Map<String, Errors> errors;
+    private final int throttleTimeMs;
 
     public DeleteTopicsResponse(Map<String, Errors> errors) {
+        this(DEFAULT_THROTTLE_TIME, errors);
+    }
+
+    public DeleteTopicsResponse(int throttleTimeMs, Map<String, Errors> errors) {
+        this.throttleTimeMs = throttleTimeMs;
         this.errors = errors;
     }
 
     public DeleteTopicsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
         Map<String, Errors> errors = new HashMap<>();
         for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
@@ -61,6 +69,8 @@ public class DeleteTopicsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.DELETE_TOPICS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
         for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
             Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
@@ -72,6 +82,10 @@ public class DeleteTopicsResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<String, Errors> errors() {
         return errors;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index 287eda9..b43e254 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -73,11 +73,13 @@ public class DescribeGroupsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short version = version();
         switch (version) {
             case 0:
                 return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
+            case 1:
+                return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), groupIds);
 
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 797ed58..bd7a087 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 public class DescribeGroupsResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String GROUPS_KEY_NAME = "groups";
 
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -58,12 +59,19 @@ public class DescribeGroupsResponse extends AbstractResponse {
      */
 
     private final Map<String, GroupMetadata> groups;
+    private final int throttleTimeMs;
 
     public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
+        this(DEFAULT_THROTTLE_TIME, groups);
+    }
+
+    public DescribeGroupsResponse(int throttleTimeMs, Map<String, GroupMetadata> groups) {
+        this.throttleTimeMs = throttleTimeMs;
         this.groups = groups;
     }
 
     public DescribeGroupsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.groups = new HashMap<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
             Struct groupStruct = (Struct) groupObj;
@@ -89,6 +97,10 @@ public class DescribeGroupsResponse extends AbstractResponse {
         }
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<String, GroupMetadata> groups() {
         return groups;
     }
@@ -184,16 +196,22 @@ public class DescribeGroupsResponse extends AbstractResponse {
     }
 
     public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
+        return fromError(DEFAULT_THROTTLE_TIME, error, groupIds);
+    }
+
+    public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String> groupIds) {
         GroupMetadata errorMetadata = GroupMetadata.forError(error);
         Map<String, GroupMetadata> groups = new HashMap<>();
         for (String groupId : groupIds)
             groups.put(groupId, errorMetadata);
-        return new DescribeGroupsResponse(groups);
+        return new DescribeGroupsResponse(throttleTimeMs, groups);
     }
 
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         List<Struct> groupStructs = new ArrayList<>();
         for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index e6eb54e..9c215be 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -96,8 +96,8 @@ public class EndTxnRequest extends AbstractRequest {
     }
 
     @Override
-    public EndTxnResponse getErrorResponse(Throwable e) {
-        return new EndTxnResponse(Errors.forException(e));
+    public EndTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new EndTxnResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public static EndTxnRequest parse(ByteBuffer buffer, short version) {


Mime
View raw message