kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [3/4] kafka git commit: KAFKA-4954; Request handler utilization quotas
Date Mon, 01 May 2017 16:13:40 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 3ff6aca..99e4e8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class EndTxnResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Possible error codes:
@@ -34,15 +35,22 @@ public class EndTxnResponse extends AbstractResponse {
     //   InvalidProducerEpoch
 
     private final Errors error;
+    private final int throttleTimeMs;
 
-    public EndTxnResponse(Errors error) {
+    public EndTxnResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public EndTxnResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -50,6 +58,7 @@ public class EndTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 78715be..4c6998b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -211,7 +211,7 @@ public class FetchRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
@@ -220,7 +220,7 @@ public class FetchRequest extends AbstractRequest {
                 null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
-        return new FetchResponse(responseData, 0);
+        return new FetchResponse(responseData, throttleTimeMs);
     }
 
     public int replicaId() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index 46f3426..b2eaf63 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -84,12 +84,13 @@ public class FindCoordinatorRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
-            case 1:
                 return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
+            case 1:
+                return new FindCoordinatorResponse(throttleTimeMs, Errors.forException(e), Node.noNode());
 
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index f96f123..b558b62 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 
 public class FindCoordinatorResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
     private static final String COORDINATOR_KEY_NAME = "coordinator";
@@ -42,17 +43,24 @@ public class FindCoordinatorResponse extends AbstractResponse {
     private static final String HOST_KEY_NAME = "host";
     private static final String PORT_KEY_NAME = "port";
 
+    private final int throttleTimeMs;
     private final String errorMessage;
     private final Errors error;
     private final Node node;
 
     public FindCoordinatorResponse(Errors error, Node node) {
+        this(DEFAULT_THROTTLE_TIME, error, node);
+    }
+
+    public FindCoordinatorResponse(int throttleTimeMs, Errors error, Node node) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.node = node;
         this.errorMessage = null;
     }
 
     public FindCoordinatorResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
             errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
@@ -66,6 +74,10 @@ public class FindCoordinatorResponse extends AbstractResponse {
         node = new Node(nodeId, host, port);
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -77,6 +89,8 @@ public class FindCoordinatorResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
             struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 44591a0..7e08a55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -75,11 +75,13 @@ public class HeartbeatRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new HeartbeatResponse(Errors.forException(e));
+            case 1:
+                return new HeartbeatResponse(throttleTimeMs, Errors.forException(e));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.HEARTBEAT.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 9bc400c..a90212b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 public class HeartbeatResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -37,15 +38,26 @@ public class HeartbeatResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
     private final Errors error;
+    private final int throttleTimeMs;
 
     public HeartbeatResponse(Errors error) {
+        this(DEFAULT_THROTTLE_TIME, error);
+    }
+
+    public HeartbeatResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public HeartbeatResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -53,6 +65,8 @@ public class HeartbeatResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
index eff05e2..57d32e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -77,8 +77,8 @@ public class InitPidRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
-        return new InitPidResponse(Errors.forException(e));
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new InitPidResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public static InitPidRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
index 4b65aea..3c858af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
@@ -29,27 +29,35 @@ public class InitPidResponse extends AbstractResponse {
      * OK
      *
      */
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String EPOCH_KEY_NAME = "producer_epoch";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private final int throttleTimeMs;
     private final Errors error;
     private final long producerId;
     private final short epoch;
 
-    public InitPidResponse(Errors error, long producerId, short epoch) {
+    public InitPidResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.producerId = producerId;
         this.epoch = epoch;
     }
 
     public InitPidResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
         this.epoch = struct.getShort(EPOCH_KEY_NAME);
     }
 
-    public InitPidResponse(Errors errors) {
-        this(errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    public InitPidResponse(int throttleTimeMs, Errors errors) {
+        this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
     }
 
     public long producerId() {
@@ -67,6 +75,7 @@ public class InitPidResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(PRODUCER_ID_KEY_NAME, producerId);
         struct.set(EPOCH_KEY_NAME, epoch);
         struct.set(ERROR_CODE_KEY_NAME, error.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 994d9a2..1080fe7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -149,7 +149,7 @@ public class JoinGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
@@ -161,6 +161,15 @@ public class JoinGroupRequest extends AbstractRequest {
                         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
                         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
                         Collections.<String, ByteBuffer>emptyMap());
+            case 2:
+                return new JoinGroupResponse(
+                        throttleTimeMs,
+                        Errors.forException(e),
+                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
+                        JoinGroupResponse.UNKNOWN_PROTOCOL,
+                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+                        Collections.<String, ByteBuffer>emptyMap());
 
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 1f702c7..a1c9e2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 public class JoinGroupResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -54,6 +55,7 @@ public class JoinGroupResponse extends AbstractResponse {
     public static final int UNKNOWN_GENERATION_ID = -1;
     public static final String UNKNOWN_MEMBER_ID = "";
 
+    private final int throttleTimeMs;
     private final Errors error;
     private final int generationId;
     private final String groupProtocol;
@@ -67,6 +69,17 @@ public class JoinGroupResponse extends AbstractResponse {
                              String memberId,
                              String leaderId,
                              Map<String, ByteBuffer> groupMembers) {
+        this(DEFAULT_THROTTLE_TIME, error, generationId, groupProtocol, memberId, leaderId, groupMembers);
+    }
+
+    public JoinGroupResponse(int throttleTimeMs,
+            Errors error,
+            int generationId,
+            String groupProtocol,
+            String memberId,
+            String leaderId,
+            Map<String, ByteBuffer> groupMembers) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.generationId = generationId;
         this.groupProtocol = groupProtocol;
@@ -76,6 +89,7 @@ public class JoinGroupResponse extends AbstractResponse {
     }
 
     public JoinGroupResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         members = new HashMap<>();
 
         for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
@@ -91,6 +105,10 @@ public class JoinGroupResponse extends AbstractResponse {
         leaderId = struct.getString(LEADER_ID_KEY_NAME);
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -126,6 +144,8 @@ public class JoinGroupResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.JOIN_GROUP.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(GENERATION_ID_KEY_NAME, generationId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 8843755..36426c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -179,7 +179,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, Errors> responses = new HashMap<>(partitionStates.size());
         for (TopicPartition partition : partitionStates.keySet()) {
             responses.put(partition, Errors.forException(e));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 4b5820b..76e076e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -67,11 +67,13 @@ public class LeaveGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new LeaveGroupResponse(Errors.forException(e));
+            case 1:
+                return new LeaveGroupResponse(throttleTimeMs, Errors.forException(e));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.LEAVE_GROUP.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 49b704b..ccfc8a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 public class LeaveGroupResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -36,15 +37,26 @@ public class LeaveGroupResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
     private final Errors error;
+    private final int throttleTimeMs;
 
     public LeaveGroupResponse(Errors error) {
+        this(DEFAULT_THROTTLE_TIME, error);
+    }
+
+    public LeaveGroupResponse(int throttleTimeMs, Errors error) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
     }
 
     public LeaveGroupResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -52,6 +64,8 @@ public class LeaveGroupResponse extends AbstractResponse {
     @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index cceff92..3d4f2b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -49,11 +49,13 @@ public class ListGroupsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new ListGroupsResponse(Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
+            case 1:
+                return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.LIST_GROUPS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index d0409ef..13f589f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 public class ListGroupsResponse extends AbstractResponse {
 
+    public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String GROUPS_KEY_NAME = "groups";
     public static final String GROUP_ID_KEY_NAME = "group_id";
@@ -40,14 +41,21 @@ public class ListGroupsResponse extends AbstractResponse {
      */
 
     private final Errors error;
+    private final int throttleTimeMs;
     private final List<Group> groups;
 
     public ListGroupsResponse(Errors error, List<Group> groups) {
+        this(DEFAULT_THROTTLE_TIME, error, groups);
+    }
+
+    public ListGroupsResponse(int throttleTimeMs, Errors error, List<Group> groups) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.groups = groups;
     }
 
     public ListGroupsResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.groups = new ArrayList<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
@@ -58,6 +66,10 @@ public class ListGroupsResponse extends AbstractResponse {
         }
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public List<Group> groups() {
         return groups;
     }
@@ -88,6 +100,8 @@ public class ListGroupsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LIST_GROUPS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         List<Struct> groupList = new ArrayList<>();
         for (Group group : groups) {
@@ -101,7 +115,11 @@ public class ListGroupsResponse extends AbstractResponse {
     }
 
     public static ListGroupsResponse fromError(Errors error) {
-        return new ListGroupsResponse(error, Collections.<Group>emptyList());
+        return fromError(DEFAULT_THROTTLE_TIME, error);
+    }
+
+    public static ListGroupsResponse fromError(int throttleTimeMs, Errors error) {
+        return new ListGroupsResponse(throttleTimeMs, error, Collections.<Group>emptyList());
     }
 
     public static ListGroupsResponse parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 3327071..7dbffd1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -169,7 +169,7 @@ public class ListOffsetRequest extends AbstractRequest {
         super(version);
         this.replicaId = replicaId;
         this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null;
-        this.partitionTimestamps = version == 1 ? (Map<TopicPartition, Long>) targetTimes : null;
+        this.partitionTimestamps = version >= 1 ? (Map<TopicPartition, Long>) targetTimes : null;
         this.duplicatePartitions = Collections.emptySet();
     }
 
@@ -202,7 +202,7 @@ public class ListOffsetRequest extends AbstractRequest {
 
     @Override
     @SuppressWarnings("deprecation")
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
 
         short versionId = version();
@@ -224,6 +224,8 @@ public class ListOffsetRequest extends AbstractRequest {
             case 0:
             case 1:
                 return new ListOffsetResponse(responseData);
+            case 2:
+                return new ListOffsetResponse(throttleTimeMs, responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.LIST_OFFSETS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 3f049b4..61c2a55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -33,6 +33,7 @@ public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
     public static final long UNKNOWN_OFFSET = -1L;
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
@@ -105,16 +106,23 @@ public class ListOffsetResponse extends AbstractResponse {
         }
     }
 
+    private final int throttleTimeMs;
     private final Map<TopicPartition, PartitionData> responseData;
 
     /**
-     * Constructor for all versions.
+     * Constructor for all versions without throttle time
      */
     public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
+        this(DEFAULT_THROTTLE_TIME, responseData);
+    }
+
+    public ListOffsetResponse(int throttleTimeMs, Map<TopicPartition, PartitionData> responseData) {
+        this.throttleTimeMs = throttleTimeMs;
         this.responseData = responseData;
     }
 
     public ListOffsetResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
@@ -140,6 +148,10 @@ public class ListOffsetResponse extends AbstractResponse {
         }
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<TopicPartition, PartitionData> responseData() {
         return responseData;
     }
@@ -151,6 +163,8 @@ public class ListOffsetResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LIST_OFFSETS.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 97072d5..3c20139 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -105,7 +105,7 @@ public class MetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
         Errors error = Errors.forException(e);
         List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
@@ -121,6 +121,8 @@ public class MetadataRequest extends AbstractRequest {
             case 1:
             case 2:
                 return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
+            case 3:
+                return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.METADATA.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 51aaa23..bd79653 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -35,6 +35,7 @@ import java.util.Set;
 
 public class MetadataResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String BROKERS_KEY_NAME = "brokers";
     private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
 
@@ -80,6 +81,7 @@ public class MetadataResponse extends AbstractResponse {
     private static final String REPLICAS_KEY_NAME = "replicas";
     private static final String ISR_KEY_NAME = "isr";
 
+    private final int throttleTimeMs;
     private final Collection<Node> brokers;
     private final Node controller;
     private final List<TopicMetadata> topicMetadata;
@@ -89,6 +91,11 @@ public class MetadataResponse extends AbstractResponse {
      * Constructor for all versions.
      */
     public MetadataResponse(List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) {
+        this(DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
+    }
+
+    public MetadataResponse(int throttleTimeMs, List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) {
+        this.throttleTimeMs = throttleTimeMs;
         this.brokers = brokers;
         this.controller = getControllerNode(controllerId, brokers);
         this.topicMetadata = topicMetadata;
@@ -96,6 +103,7 @@ public class MetadataResponse extends AbstractResponse {
     }
 
     public MetadataResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         Map<Integer, Node> brokers = new HashMap<>();
         Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
         for (Object brokerStruct : brokerStructs) {
@@ -179,6 +187,10 @@ public class MetadataResponse extends AbstractResponse {
         return null;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     /**
      * Get a map of the topics which had metadata errors
      * @return the map
@@ -365,6 +377,8 @@ public class MetadataResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.METADATA.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         List<Struct> brokerArray = new ArrayList<>();
         for (Node node : brokers) {
             Struct broker = struct.instance(BROKERS_KEY_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 45975d0..4402c4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -133,6 +133,7 @@ public class OffsetCommitRequest extends AbstractRequest {
                             DEFAULT_RETENTION_TIME, offsetData, version);
                 case 1:
                 case 2:
+                case 3:
                     long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME : this.retentionTime;
                     return new OffsetCommitRequest(groupId, generationId, memberId, retentionTime, offsetData, version);
                 default:
@@ -245,7 +246,7 @@ public class OffsetCommitRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, Errors> responseData = new HashMap<>();
         for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
             responseData.put(entry.getKey(), Errors.forException(e));
@@ -257,6 +258,8 @@ public class OffsetCommitRequest extends AbstractRequest {
             case 1:
             case 2:
                 return new OffsetCommitResponse(responseData);
+            case 3:
+                return new OffsetCommitResponse(throttleTimeMs, responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_COMMIT.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index b1dae37..d8d647d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -30,6 +30,7 @@ import java.util.Map;
 
 public class OffsetCommitResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level fields
@@ -57,12 +58,19 @@ public class OffsetCommitResponse extends AbstractResponse {
      */
 
     private final Map<TopicPartition, Errors> responseData;
+    private final int throttleTimeMs;
 
     public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) {
+        this(DEFAULT_THROTTLE_TIME, responseData);
+    }
+
+    public OffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> responseData) {
+        this.throttleTimeMs = throttleTimeMs;
         this.responseData = responseData;
     }
 
     public OffsetCommitResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
@@ -79,6 +87,8 @@ public class OffsetCommitResponse extends AbstractResponse {
     @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupDataByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();
@@ -100,6 +110,10 @@ public class OffsetCommitResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<TopicPartition, Errors> responseData() {
         return responseData;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 71ec3f6..1d810e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -116,6 +116,10 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     public OffsetFetchResponse getErrorResponse(Errors error) {
+        return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, error);
+    }
+
+    public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Errors error) {
         short versionId = version();
 
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responsePartitions = new HashMap<>();
@@ -133,6 +137,8 @@ public class OffsetFetchRequest extends AbstractRequest {
             case 1:
             case 2:
                 return new OffsetFetchResponse(error, responsePartitions);
+            case 3:
+                return new OffsetFetchResponse(throttleTimeMs, error, responsePartitions);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_FETCH.latestVersion()));
@@ -140,8 +146,8 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     @Override
-    public OffsetFetchResponse getErrorResponse(Throwable e) {
-        return getErrorResponse(Errors.forException(e));
+    public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return getErrorResponse(throttleTimeMs, Errors.forException(e));
     }
 
     public String groupId() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 69507f0..f795a5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.CollectionUtils;
 
 public class OffsetFetchResponse extends AbstractResponse {
 
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
@@ -67,6 +68,7 @@ public class OffsetFetchResponse extends AbstractResponse {
 
     private final Map<TopicPartition, PartitionData> responseData;
     private final Errors error;
+    private final int throttleTimeMs;
 
     public static final class PartitionData {
         public final long offset;
@@ -85,16 +87,28 @@ public class OffsetFetchResponse extends AbstractResponse {
     }
 
     /**
-     * Constructor for all versions.
+     * Constructor for all versions without throttle time.
      * @param error Potential coordinator or group level error code (for api version 2 and later)
      * @param responseData Fetched offset information grouped by topic-partition
      */
     public OffsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) {
+        this(DEFAULT_THROTTLE_TIME, error, responseData);
+    }
+
+    /**
+     * Constructor with throttle time
+     * @param throttleTimeMs The time in milliseconds that this response was throttled
+     * @param error Potential coordinator or group level error code (for api version 2 and later)
+     * @param responseData Fetched offset information grouped by topic-partition
+     */
+    public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition, PartitionData> responseData) {
+        this.throttleTimeMs = throttleTimeMs;
         this.responseData = responseData;
         this.error = error;
     }
 
     public OffsetFetchResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         Errors topLevelError = Errors.NONE;
         this.responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
@@ -128,6 +142,10 @@ public class OffsetFetchResponse extends AbstractResponse {
         }
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public boolean hasError() {
         return this.error != Errors.NONE;
     }
@@ -147,6 +165,8 @@ public class OffsetFetchResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.OFFSET_FETCH.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 3c285f1..f898a75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -127,7 +127,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap();
         for (TopicPartition tp : epochsByPartition.keySet()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 482811e..b63f6c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -224,7 +224,7 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         /* In case the producer doesn't actually want any response */
         if (acks == 0)
             return null;
@@ -241,7 +241,7 @@ public class ProduceRequest extends AbstractRequest {
             case 1:
             case 2:
             case 3:
-                return new ProduceResponse(responseMap);
+                return new ProduceResponse(responseMap, throttleTimeMs);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.PRODUCE.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 152391e..06c1f6e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -46,7 +46,6 @@ public class ProduceResponse extends AbstractResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     public static final long INVALID_OFFSET = -1L;
-    public static final int DEFAULT_THROTTLE_TIME = 0;
 
     /**
      * Possible error code:

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index 56c29c5..e49b727 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -40,6 +40,29 @@ public class SaslHandshakeRequest extends AbstractRequest {
 
     private final String mechanism;
 
+    public static class Builder extends AbstractRequest.Builder<SaslHandshakeRequest> {
+        private final String mechanism;
+
+        public Builder(String mechanism) {
+            super(ApiKeys.SASL_HANDSHAKE);
+            this.mechanism = mechanism;
+        }
+
+        @Override
+        public SaslHandshakeRequest build(short version) {
+            return new SaslHandshakeRequest(mechanism);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=SaslHandshakeRequest").
+                append(", mechanism=").append(mechanism).
+                append(")");
+            return bld.toString();
+        }
+    }
+
     public SaslHandshakeRequest(String mechanism) {
         super(ApiKeys.SASL_HANDSHAKE.latestVersion());
         this.mechanism = mechanism;
@@ -55,7 +78,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 325ee06..48aa16e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -103,7 +103,7 @@ public class StopReplicaRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Map<TopicPartition, Errors> responses = new HashMap<>(partitions.size());
         for (TopicPartition partition : partitions) {
             responses.put(partition, Errors.forException(e));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index dbc19ac..82df84a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -98,13 +98,18 @@ public class SyncGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:
                 return new SyncGroupResponse(
                         Errors.forException(e),
                         ByteBuffer.wrap(new byte[]{}));
+            case 1:
+                return new SyncGroupResponse(
+                        throttleTimeMs,
+                        Errors.forException(e),
+                        ByteBuffer.wrap(new byte[]{}));
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.SYNC_GROUP.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 4a06491..b99a99f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 public class SyncGroupResponse extends AbstractResponse {
 
+    public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
 
@@ -39,18 +40,29 @@ public class SyncGroupResponse extends AbstractResponse {
      */
 
     private final Errors error;
+    private final int throttleTimeMs;
     private final ByteBuffer memberState;
 
     public SyncGroupResponse(Errors error, ByteBuffer memberState) {
+        this(DEFAULT_THROTTLE_TIME, error, memberState);
+    }
+
+    public SyncGroupResponse(int throttleTimeMs, Errors error, ByteBuffer memberState) {
+        this.throttleTimeMs = throttleTimeMs;
         this.error = error;
         this.memberState = memberState;
     }
 
     public SyncGroupResponse(Struct struct) {
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Errors error() {
         return error;
     }
@@ -62,6 +74,8 @@ public class SyncGroupResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SYNC_GROUP.responseSchema(version));
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
         return struct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 584f733..cca8875 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -155,12 +155,12 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
     }
 
     @Override
-    public TxnOffsetCommitResponse getErrorResponse(Throwable e) {
+    public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         Map<TopicPartition, Errors> errors = new HashMap<>(offsets.size());
         for (TopicPartition partition : offsets.keySet())
             errors.put(partition, error);
-        return new TxnOffsetCommitResponse(errors);
+        return new TxnOffsetCommitResponse(throttleTimeMs, errors);
     }
 
     public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 5574aea..37b9a50 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class TxnOffsetCommitResponse extends AbstractResponse {
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
     private static final String TOPIC_KEY_NAME = "topic";
@@ -43,12 +44,15 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
     //   InvalidCommitOffsetSize
 
     private final Map<TopicPartition, Errors> errors;
+    private final int throttleTimeMs;
 
-    public TxnOffsetCommitResponse(Map<TopicPartition, Errors> errors) {
+    public TxnOffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
+        this.throttleTimeMs = throttleTimeMs;
         this.errors = errors;
     }
 
     public TxnOffsetCommitResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         Map<TopicPartition, Errors> errors = new HashMap<>();
         Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
         for (Object topicPartitionObj : topicPartitionsArray) {
@@ -67,6 +71,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(errors);
         Object[] partitionsArray = new Object[mappedPartitions.size()];
         int i = 0;
@@ -91,6 +96,10 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
         return struct;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public Map<TopicPartition, Errors> errors() {
         return errors;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index fc7a33f..be2eaab 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -284,7 +284,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         if (versionId <= 3)
             return new UpdateMetadataResponse(Errors.forException(e));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 998d504..7cded24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -192,7 +192,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
     }
 
     @Override
-    public WriteTxnMarkersResponse getErrorResponse(Throwable e) {
+    public WriteTxnMarkersResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
 
         Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(markers.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 1ee9d0f..55b4fc6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -158,7 +158,8 @@ public class NetworkClientTest {
     }
 
     private void maybeSetExpectedApiVersionsResponse() {
-        ByteBuffer buffer = ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize((short) 0, new ResponseHeader(0));
+        short apiVersionsResponseVersion = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion;
+        ByteBuffer buffer = ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize(apiVersionsResponseVersion, new ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 934c895..2017ef9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -384,7 +384,7 @@ public class SenderTest {
             public boolean matches(AbstractRequest body) {
                 return body instanceof InitPidRequest;
             }
-        }, new InitPidResponse(Errors.NONE, producerId, (short) 0));
+        }, new InitPidResponse(0, Errors.NONE, producerId, (short) 0));
         sender.run(time.milliseconds());
         assertTrue(transactionManager.hasPid());
         assertEquals(producerId, transactionManager.pidAndEpoch().producerId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index a1efa58..1ab86aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -199,7 +199,7 @@ public class TransactionManagerTest {
                 assertEquals(epoch, addOffsetsToTxnRequest.producerEpoch());
                 return true;
             }
-        }, new AddOffsetsToTxnResponse(Errors.NONE));
+        }, new AddOffsetsToTxnResponse(0, Errors.NONE));
 
         sender.run(time.milliseconds());  // Send AddOffsetsRequest
         assertTrue(transactionManager.hasPendingOffsetCommits());  // We should now have created and queued the offset commit request.
@@ -219,7 +219,7 @@ public class TransactionManagerTest {
                 assertEquals(epoch, txnOffsetCommitRequest.producerEpoch());
                 return true;
             }
-        }, new TxnOffsetCommitResponse(txnOffsetCommitResponse));
+        }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
 
         assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
         sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
@@ -528,7 +528,7 @@ public class TransactionManagerTest {
                 assertEquals(initPidRequest.transactionTimeoutMs(), transactionTimeoutMs);
                 return true;
             }
-        }, new InitPidResponse(error, pid, epoch), shouldDisconnect);
+        }, new InitPidResponse(0, error, pid, epoch), shouldDisconnect);
     }
 
     private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
@@ -563,7 +563,7 @@ public class TransactionManagerTest {
                 assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
                 return true;
             }
-        }, new AddPartitionsToTxnResponse(error));
+        }, new AddPartitionsToTxnResponse(0, error));
     }
 
     private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
@@ -577,7 +577,7 @@ public class TransactionManagerTest {
                 assertEquals(result, endTxnRequest.command());
                 return true;
             }
-        }, new EndTxnResponse(error));
+        }, new EndTxnResponse(0, error));
     }
 
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 1de6789..345ace1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.network;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
@@ -27,6 +29,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
 import javax.net.ssl.SSLContext;
@@ -40,6 +43,7 @@ import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -460,6 +464,41 @@ public class SslTransportLayerTest {
         NetworkTestUtils.checkClientConnection(selector, node, 64000, 10);
     }
 
+    /**
+     * Tests that time spent on the network thread is accumulated on each channel
+     */
+    @Test
+    public void testNetworkThreadTimeRecorded() throws Exception {
+        selector.close();
+        this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM,
+                "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder);
+
+        String node = "0";
+        server = createEchoServer(SecurityProtocol.SSL);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        String message = TestUtils.randomString(10 * 1024);
+        NetworkTestUtils.waitForChannelReady(selector, node);
+        KafkaChannel channel = selector.channel(node);
+        assertTrue("SSL handshake time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+        assertEquals("Time not reset", 0, channel.getAndResetNetworkThreadTimeNanos());
+
+        selector.mute(node);
+        selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes())));
+        while (selector.completedSends().isEmpty()) {
+            selector.poll(100L);
+        }
+        assertTrue("Send time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+        assertEquals("Time not reset", 0, channel.getAndResetNetworkThreadTimeNanos());
+
+        selector.unmute(node);
+        while (selector.completedReceives().isEmpty()) {
+            selector.poll(100L);
+        }
+        assertTrue("Receive time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+    }
+
     @Test
     public void testCloseSsl() throws Exception {
         testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index a4aeb25..8410e6a 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -16,6 +16,14 @@
  */
 package org.apache.kafka.common.protocol;
 
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.junit.Test;
 
 public class ApiKeysTest {
@@ -35,4 +43,27 @@ public class ApiKeysTest {
         ApiKeys.PRODUCE.requestSchema((short) Protocol.REQUESTS[ApiKeys.PRODUCE.id].length);
     }
 
+    /**
+     * All valid client responses which may be throttled should have a field named
+     * 'throttle_time_ms' to return the throttle time to the client. Exclusions are
+     * <ul>
+     *   <li>Cluster actions used only for inter-broker are throttled only if unauthorized
+     *   <li> SASL_HANDSHAKE is not throttled when used for authentication when a connection
+     *        is established. At any other time, this request returns an error response that
+     *        may be throttled.
+     * </ul>
+     */
+    @Test
+    public void testResponseThrottleTime() {
+        List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE);
+
+        for (ApiKeys apiKey: ApiKeys.values()) {
+            Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());
+            Field throttleTimeField = responseSchema.get("throttle_time_ms");
+            if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
+                assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField);
+            else
+                assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9e283c0..422f9e6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -841,7 +841,7 @@ public class RequestResponseTest {
     }
 
     private InitPidResponse createInitPidResponse() {
-        return new InitPidResponse(Errors.NONE, 3332, (short) 3);
+        return new InitPidResponse(0, Errors.NONE, 3332, (short) 3);
     }
 
 
@@ -871,7 +871,7 @@ public class RequestResponseTest {
     }
 
     private AddPartitionsToTxnResponse createAddPartitionsToTxnResponse() {
-        return new AddPartitionsToTxnResponse(Errors.NONE);
+        return new AddPartitionsToTxnResponse(0, Errors.NONE);
     }
 
     private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() {
@@ -879,7 +879,7 @@ public class RequestResponseTest {
     }
 
     private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() {
-        return new AddOffsetsToTxnResponse(Errors.NONE);
+        return new AddOffsetsToTxnResponse(0, Errors.NONE);
     }
 
     private EndTxnRequest createEndTxnRequest() {
@@ -887,7 +887,7 @@ public class RequestResponseTest {
     }
 
     private EndTxnResponse createEndTxnResponse() {
-        return new EndTxnResponse(Errors.NONE);
+        return new EndTxnResponse(0, Errors.NONE);
     }
 
     private WriteTxnMarkersRequest createWriteTxnMarkersRequest() {
@@ -914,7 +914,7 @@ public class RequestResponseTest {
     private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {
         final Map<TopicPartition, Errors> errorPerPartitions = new HashMap<>();
         errorPerPartitions.put(new TopicPartition("topic", 73), Errors.NONE);
-        return new TxnOffsetCommitResponse(errorPerPartitions);
+        return new TxnOffsetCommitResponse(0, errorPerPartitions);
     }
 
     private static class ByteBufferChannel implements GatheringByteChannel {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7bfe76a..2d41869 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,7 +41,7 @@ import scala.reflect.{ClassTag, classTag}
 
 object RequestChannel extends Logging {
   val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
-    buffer = shutdownReceive, startTimeMs = 0, listenerName = new ListenerName(""),
+    buffer = shutdownReceive, startTimeNanos = 0, listenerName = new ListenerName(""),
     securityProtocol = SecurityProtocol.PLAINTEXT)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -57,14 +57,15 @@ object RequestChannel extends Logging {
   }
 
   case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
-                     startTimeMs: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
+                     startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
     // These need to be volatile because the readers are in the network thread and the writers are in the request
     // handler threads or the purgatory threads
-    @volatile var requestDequeueTimeMs = -1L
-    @volatile var apiLocalCompleteTimeMs = -1L
-    @volatile var responseCompleteTimeMs = -1L
-    @volatile var responseDequeueTimeMs = -1L
-    @volatile var apiRemoteCompleteTimeMs = -1L
+    @volatile var requestDequeueTimeNanos = -1L
+    @volatile var apiLocalCompleteTimeNanos = -1L
+    @volatile var responseCompleteTimeNanos = -1L
+    @volatile var responseDequeueTimeNanos = -1L
+    @volatile var apiRemoteCompleteTimeNanos = -1L
+    @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val requestId = buffer.getShort()
 
@@ -122,26 +123,33 @@ object RequestChannel extends Logging {
 
     trace("Processor %d received request : %s".format(processor, requestDesc(true)))
 
-    def updateRequestMetrics() {
-      val endTimeMs = Time.SYSTEM.milliseconds
-      // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes if the remote
+    def requestThreadTimeNanos = {
+      if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
+      math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
+    }
+
+    def updateRequestMetrics(networkThreadTimeNanos: Long) {
+      val endTimeNanos = Time.SYSTEM.nanoseconds
+      // In some corner cases, apiLocalCompleteTimeNanos may not be set when the request completes if the remote
       // processing time is really small. This value is set in KafkaApis from a request handling thread.
       // This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
-      // see a negative value here. In that case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
-      if (apiLocalCompleteTimeMs < 0)
-        apiLocalCompleteTimeMs = responseCompleteTimeMs
-      // If the apiRemoteCompleteTimeMs is not set (i.e., for requests that do not go through a purgatory), then it is
-      // the same as responseCompleteTimeMs.
-      if (apiRemoteCompleteTimeMs < 0)
-        apiRemoteCompleteTimeMs = responseCompleteTimeMs
-
-      val requestQueueTime = math.max(requestDequeueTimeMs - startTimeMs, 0)
-      val apiLocalTime = math.max(apiLocalCompleteTimeMs - requestDequeueTimeMs, 0)
-      val apiRemoteTime = math.max(apiRemoteCompleteTimeMs - apiLocalCompleteTimeMs, 0)
-      val apiThrottleTime = math.max(responseCompleteTimeMs - apiRemoteCompleteTimeMs, 0)
-      val responseQueueTime = math.max(responseDequeueTimeMs - responseCompleteTimeMs, 0)
-      val responseSendTime = math.max(endTimeMs - responseDequeueTimeMs, 0)
-      val totalTime = endTimeMs - startTimeMs
+      // see a negative value here. In that case, use responseCompleteTimeNanos as apiLocalCompleteTimeNanos.
+      if (apiLocalCompleteTimeNanos < 0)
+        apiLocalCompleteTimeNanos = responseCompleteTimeNanos
+      // If the apiRemoteCompleteTimeNanos is not set (i.e., for requests that do not go through a purgatory), then it is
+      // the same as responseCompleteTimeNans.
+      if (apiRemoteCompleteTimeNanos < 0)
+        apiRemoteCompleteTimeNanos = responseCompleteTimeNanos
+
+      def nanosToMs(nanos: Long) = math.max(TimeUnit.NANOSECONDS.toMillis(nanos), 0)
+
+      val requestQueueTime = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
+      val apiLocalTime = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
+      val apiRemoteTime = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
+      val apiThrottleTime = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
+      val responseQueueTime = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
+      val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+      val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
       val fetchMetricNames =
         if (requestId == ApiKeys.FETCH.id) {
           val isFromFollower = body[FetchRequest].isFromFollower
@@ -164,16 +172,32 @@ object RequestChannel extends Logging {
         m.totalTimeHist.update(totalTime)
       }
 
+      // Records network handler thread usage. This is included towards the request quota for the
+      // user/client. Throttling is only performed when request handler thread usage
+      // is recorded, just before responses are queued for delivery.
+      // The time recorded here is the time spent on the network thread for receiving this request
+      // and sending the response. Note that for the first request on a connection, the time includes
+      // the total time spent on authentication, which may be significant for SASL/SSL.
+      recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))
+
       if (requestLogger.isDebugEnabled) {
         val detailsEnabled = requestLogger.isTraceEnabled
-        requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s"
-          .format(requestDesc(detailsEnabled), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal, listenerName.value))
+        def nanosToMs(nanos: Long) = TimeUnit.NANOSECONDS.toMicros(math.max(nanos, 0)).toDouble / TimeUnit.MILLISECONDS.toMicros(1)
+        val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
+        val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
+        val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
+        val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
+        val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
+        val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+        requestLogger.trace("Completed request:%s from connection %s;totalTime:%f,requestQueueTime:%f,localTime:%f,remoteTime:%f,responseQueueTime:%f,sendTime:%f,securityProtocol:%s,principal:%s,listener:%s"
+          .format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, listenerName.value))
       }
     }
   }
 
   case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
-    request.responseCompleteTimeMs = Time.SYSTEM.milliseconds
+    request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
+    if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
 
     def this(request: Request, responseSend: Send) =
       this(request.processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
@@ -253,7 +277,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
   def receiveResponse(processor: Int): RequestChannel.Response = {
     val response = responseQueues(processor).poll()
     if (response != null)
-      response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
+      response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
     response
   }
 


Mime
View raw message