kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken
Date Wed, 07 Aug 2019 08:02:54 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 926fb35  KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken
926fb35 is described below

commit 926fb35d9dcefd45c1e1d276ee7252b15875f23e
Author: Mickael Maison <mickael.maison@gmail.com>
AuthorDate: Wed Aug 7 13:32:26 2019 +0530

    KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken
    
    Author: Mickael Maison <mickael.maison@gmail.com>
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>
    
    Closes #7098 from mimaison/KAFKA-8599
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  8 ++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  6 +-
 .../apache/kafka/common/protocol/types/Struct.java |  1 +
 .../kafka/common/requests/AbstractResponse.java    |  2 +-
 .../requests/ExpireDelegationTokenRequest.java     | 77 +++++++---------------
 .../requests/ExpireDelegationTokenResponse.java    | 70 +++++---------------
 .../kafka/common/requests/RequestResponseTest.java | 13 +++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  7 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  8 ++-
 9 files changed, 73 insertions(+), 119 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8092eec..5256e36 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -73,6 +73,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
@@ -2473,8 +2474,11 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs());
+            AbstractRequest.Builder<ExpireDelegationTokenRequest> createRequest(int
timeoutMs) {
+                return new ExpireDelegationTokenRequest.Builder(
+                        new ExpireDelegationTokenRequestData()
+                            .setHmac(hmac)
+                            .setExpiryTimePeriodMs(options.expiryTimePeriodMs()));
             }
 
             @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 3f0f5e8..e05f692 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectLeadersRequestData;
 import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -93,8 +95,6 @@ import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
-import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
-import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
@@ -191,7 +191,7 @@ public enum ApiKeys {
             CreatePartitionsResponse.schemaVersions()),
     CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS,
CreateDelegationTokenResponseData.SCHEMAS),
     RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(),
RenewDelegationTokenResponse.schemaVersions()),
-    EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(),
ExpireDelegationTokenResponse.schemaVersions()),
+    EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS,
ExpireDelegationTokenResponseData.SCHEMAS),
     DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(),
DescribeDelegationTokenResponse.schemaVersions()),
     DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS),
     ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 3114aea..e47a2cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -290,6 +290,7 @@ public class Struct {
         ByteBuffer buf = (ByteBuffer) result;
         byte[] arr = new byte[buf.remaining()];
         buf.get(arr);
+        buf.flip();
         return arr;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index eb52fb8..da2b837 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -151,7 +151,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse
{
             case RENEW_DELEGATION_TOKEN:
                 return new RenewDelegationTokenResponse(struct);
             case EXPIRE_DELEGATION_TOKEN:
-                return new ExpireDelegationTokenResponse(struct);
+                return new ExpireDelegationTokenResponse(struct, version);
             case DESCRIBE_DELEGATION_TOKEN:
                 return new DescribeDelegationTokenResponse(struct);
             case DELETE_GROUPS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 5b99676..ca6d2d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -16,102 +16,69 @@
  */
 package org.apache.kafka.common.requests;
 
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
-import java.nio.ByteBuffer;
-
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
 public class ExpireDelegationTokenRequest extends AbstractRequest {
 
-    private static final String HMAC_KEY_NAME = "hmac";
-    private static final String EXPIRY_TIME_PERIOD_KEY_NAME = "expiry_time_period";
-    private final ByteBuffer hmac;
-    private final long expiryTimePeriod;
-
-    private static final Schema TOKEN_EXPIRE_REQUEST_V0 = new Schema(
-        new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."),
-        new Field(EXPIRY_TIME_PERIOD_KEY_NAME, INT64, "expiry time period in milli seconds."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out
responses before throttling.
-     */
-    private static final Schema TOKEN_EXPIRE_REQUEST_V1 = TOKEN_EXPIRE_REQUEST_V0;
+    private final ExpireDelegationTokenRequestData data;
 
-    private ExpireDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod)
{
+    private ExpireDelegationTokenRequest(ExpireDelegationTokenRequestData data, short version)
{
         super(ApiKeys.EXPIRE_DELEGATION_TOKEN, version);
-
-        this.hmac = hmac;
-        this.expiryTimePeriod = renewTimePeriod;
+        this.data = data;
     }
 
-    public ExpireDelegationTokenRequest(Struct struct, short versionId) {
-        super(ApiKeys.EXPIRE_DELEGATION_TOKEN, versionId);
-
-        hmac = struct.getBytes(HMAC_KEY_NAME);
-        expiryTimePeriod = struct.getLong(EXPIRY_TIME_PERIOD_KEY_NAME);
+    public ExpireDelegationTokenRequest(Struct struct, short version) {
+        super(ApiKeys.EXPIRE_DELEGATION_TOKEN, version);
+        this.data = new ExpireDelegationTokenRequestData(struct, version);
     }
 
     public static ExpireDelegationTokenRequest parse(ByteBuffer buffer, short version) {
         return new ExpireDelegationTokenRequest(ApiKeys.EXPIRE_DELEGATION_TOKEN.parseRequest(version,
buffer), version);
     }
 
-    public static Schema[] schemaVersions() {
-        return new Schema[] {TOKEN_EXPIRE_REQUEST_V0, TOKEN_EXPIRE_REQUEST_V1};
-    }
-
     @Override
     protected Struct toStruct() {
-        short version = version();
-        Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.requestSchema(version));
-
-        struct.set(HMAC_KEY_NAME, hmac);
-        struct.set(EXPIRY_TIME_PERIOD_KEY_NAME, expiryTimePeriod);
-
-        return struct;
+        return data.toStruct(version());
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new ExpireDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
+        return new ExpireDelegationTokenResponse(
+                new ExpireDelegationTokenResponseData()
+                    .setErrorCode(Errors.forException(e).code())
+                    .setThrottleTimeMs(throttleTimeMs));
     }
 
     public ByteBuffer hmac() {
-        return hmac;
+        return ByteBuffer.wrap(data.hmac());
     }
 
     public long expiryTimePeriod() {
-        return expiryTimePeriod;
+        return data.expiryTimePeriodMs();
     }
 
     public static class Builder extends AbstractRequest.Builder<ExpireDelegationTokenRequest>
{
-        private final ByteBuffer hmac;
-        private final long expiryTimePeriod;
+        private final ExpireDelegationTokenRequestData data;
 
-        public Builder(byte[] hmac, long expiryTimePeriod) {
+        public Builder(ExpireDelegationTokenRequestData data) {
             super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
-            this.hmac = ByteBuffer.wrap(hmac);
-            this.expiryTimePeriod = expiryTimePeriod;
+            this.data = data;
         }
 
         @Override
         public ExpireDelegationTokenRequest build(short version) {
-            return new ExpireDelegationTokenRequest(version, hmac, expiryTimePeriod);
+            return new ExpireDelegationTokenRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type: ExpireDelegationTokenRequest").
-                append(", hmac=").append(hmac).
-                append(", expiryTimePeriod=").append(expiryTimePeriod).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
index 9491a35..16a6e8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -16,92 +16,56 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
 
 public class ExpireDelegationTokenResponse extends AbstractResponse {
 
-    private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp";
-
-    private final Errors error;
-    private final long expiryTimestamp;
-    private final int throttleTimeMs;
+    private final ExpireDelegationTokenResponseData data;
 
-    private  static final Schema TOKEN_EXPIRE_RESPONSE_V0 = new Schema(
-        ERROR_CODE,
-        new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token
expires.."),
-        THROTTLE_TIME_MS);
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out
responses before throttling.
-     */
-    private static final Schema TOKEN_EXPIRE_RESPONSE_V1 = TOKEN_EXPIRE_RESPONSE_V0;
-
-    public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp)
{
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.expiryTimestamp = expiryTimestamp;
-    }
-
-    public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error) {
-        this(throttleTimeMs, error, -1);
+    public ExpireDelegationTokenResponse(ExpireDelegationTokenResponseData data) {
+        this.data = data;
     }
 
-    public ExpireDelegationTokenResponse(Struct struct) {
-        error = Errors.forCode(struct.get(ERROR_CODE));
-        this.expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME);
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+    public ExpireDelegationTokenResponse(Struct struct, short version) {
+        this.data = new ExpireDelegationTokenResponseData(struct, version);
     }
 
     public static ExpireDelegationTokenResponse parse(ByteBuffer buffer, short version) {
-        return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
-    }
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {TOKEN_EXPIRE_RESPONSE_V0, TOKEN_EXPIRE_RESPONSE_V1};
+        return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer),
version);
     }
 
     public Errors error() {
-        return error;
+        return Errors.forCode(data.errorCode());
     }
 
     public long expiryTimestamp() {
-        return expiryTimestamp;
+        return data.expiryTimestampMs();
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
+        return Collections.singletonMap(error(), 1);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version));
-
-        struct.set(ERROR_CODE, error.code());
-        struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp);
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
     public boolean hasError() {
-        return this.error != Errors.NONE;
+        return error() != Errors.NONE;
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 4218eff..0b8d98d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -58,6 +58,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -1551,11 +1553,18 @@ public class RequestResponseTest {
     }
 
     private ExpireDelegationTokenRequest createExpireTokenRequest() {
-        return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
+        ExpireDelegationTokenRequestData data = new ExpireDelegationTokenRequestData()
+                .setHmac("test".getBytes())
+                .setExpiryTimePeriodMs(System.currentTimeMillis());
+        return new ExpireDelegationTokenRequest.Builder(data).build();
     }
 
     private ExpireDelegationTokenResponse createExpireTokenResponse() {
-        return new ExpireDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis());
+        ExpireDelegationTokenResponseData data = new ExpireDelegationTokenResponseData()
+                .setThrottleTimeMs(20)
+                .setErrorCode(Errors.NONE.code())
+                .setExpiryTimestampMs(System.currentTimeMillis());
+        return new ExpireDelegationTokenResponse(data);
     }
 
     private DescribeDelegationTokenRequest createDescribeTokenRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a88cd92..2b49982 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -59,6 +59,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicR
 import org.apache.kafka.common.message.DescribeGroupsResponseData
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData
 import org.apache.kafka.common.message.FindCoordinatorResponseData
 import org.apache.kafka.common.message.HeartbeatResponseData
 import org.apache.kafka.common.message.InitProducerIdResponseData
@@ -2484,7 +2485,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace("Sending expire token response for correlation id %d to client %s."
         .format(request.header.correlationId, request.header.clientId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ExpireDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp))
+        new ExpireDelegationTokenResponse(
+            new ExpireDelegationTokenResponseData()
+              .setThrottleTimeMs(requestThrottleMs)
+              .setErrorCode(error.code)
+              .setExpiryTimestampMs(expiryTimestamp)))
     }
 
     if (!allowTokenRequests(request))
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a8d29fd..db7ab69 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
 import org.apache.kafka.common.message.DeleteGroupsRequestData
 import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.message.DescribeGroupsRequestData
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData
 import org.apache.kafka.common.message.FindCoordinatorRequestData
 import org.apache.kafka.common.message.HeartbeatRequestData
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
@@ -444,7 +445,10 @@ class RequestQuotaTest extends BaseRequestTest {
           )
 
         case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
-          new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
+          new ExpireDelegationTokenRequest.Builder(
+              new ExpireDelegationTokenRequestData()
+                .setHmac("".getBytes)
+                .setExpiryTimePeriodMs(1000L))
 
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
           new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
@@ -573,7 +577,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
       case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response,
ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
       case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
-      case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
+      case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response,
ApiKeys.EXPIRE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
       case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
       case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs


Mime
View raw message