kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Missing throttle time in OffsetsForLeaderEpoch response (#5635)
Date Tue, 11 Sep 2018 16:08:32 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 79ad902  MINOR: Missing throttle time in OffsetsForLeaderEpoch response (#5635)
79ad902 is described below

commit 79ad9026a667469a2013ce82961c0c90f3bb0877
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Sep 11 09:08:22 2018 -0700

    MINOR: Missing throttle time in OffsetsForLeaderEpoch response (#5635)
    
    With KIP-320, the OffsetsForLeaderEpoch API is intended to be used by consumers to detect
log truncation. Therefore the new response schema should expose a field for the throttle time
like all the other APIs.
    
    Reviewers: Dong Lin <lindong28@gmail.com>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  2 +-
 .../requests/OffsetsForLeaderEpochRequest.java     |  4 ++--
 .../requests/OffsetsForLeaderEpochResponse.java    | 25 +++++++++++++++++-----
 .../kafka/common/requests/RequestResponseTest.java |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  3 ++-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  1 +
 6 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index e0cdfd9..3d77100 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -155,7 +155,7 @@ public enum ApiKeys {
     DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
     INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(),
             InitProducerIdResponse.schemaVersions()),
-    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true, OffsetsForLeaderEpochRequest.schemaVersions(),
+    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, OffsetsForLeaderEpochRequest.schemaVersions(),
             OffsetsForLeaderEpochResponse.schemaVersions()),
     ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,
             AddPartitionsToTxnRequest.schemaVersions(), AddPartitionsToTxnResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 9de5d02..1c9009c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -56,7 +56,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     // V1 request is the same as v0. Per-partition leader epoch has been added to response
     private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V1 = OFFSET_FOR_LEADER_EPOCH_REQUEST_V0;
 
-    // V2 adds the current leader epoch to support fencing
+    // V2 adds the current leader epoch to support fencing and the addition of the throttle
time in the response
     private static final Field PARTITIONS_V2 = PARTITIONS.withFields(
             PARTITION_ID,
             CURRENT_LEADER_EPOCH,
@@ -177,7 +177,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
             errorResponse.put(tp, new EpochEndOffset(
                 error, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
         }
-        return new OffsetsForLeaderEpochResponse(errorResponse);
+        return new OffsetsForLeaderEpochResponse(throttleTimeMs, errorResponse);
     }
 
     public static class PartitionData {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 324a2ed..55aa71b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH;
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 
 public class OffsetsForLeaderEpochResponse extends AbstractResponse {
@@ -65,18 +66,23 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
     private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new Schema(
             TOPICS_V1);
 
-    // V2 bumped for addition of current leader epoch to the request schema.
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2 = OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1;
+    // V2 bumped for addition of current leader epoch to the request schema and the addition
of the throttle
+    // time in the response
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2 = new Schema(
+            THROTTLE_TIME_MS,
+            TOPICS_V1);
 
     public static Schema[] schemaVersions() {
         return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1,
             OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2};
     }
 
-    private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
+    private final int throttleTimeMs;
+    private final Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
 
     public OffsetsForLeaderEpochResponse(Struct struct) {
-        epochEndOffsetsByPartition = new HashMap<>();
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        this.epochEndOffsetsByPartition = new HashMap<>();
         for (Object topicAndEpocsObj : struct.get(TOPICS)) {
             Struct topicAndEpochs = (Struct) topicAndEpocsObj;
             String topic = topicAndEpochs.get(TOPIC_NAME);
@@ -93,6 +99,11 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
     }
 
     public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> epochsByTopic)
{
+        this(DEFAULT_THROTTLE_TIME, epochsByTopic);
+    }
+
+    public OffsetsForLeaderEpochResponse(int throttleTimeMs, Map<TopicPartition, EpochEndOffset>
epochsByTopic) {
+        this.throttleTimeMs = throttleTimeMs;
         this.epochEndOffsetsByPartition = epochsByTopic;
     }
 
@@ -108,6 +119,10 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
         return errorCounts;
     }
 
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
     public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short versionId)
{
         return new OffsetsForLeaderEpochResponse(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(versionId).read(buffer));
     }
@@ -115,9 +130,9 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct responseStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(version));
+        responseStruct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
         Map<String, Map<Integer, EpochEndOffset>> endOffsetsByTopic = CollectionUtils.groupPartitionDataByTopic(epochEndOffsetsByPartition);
-
         List<Struct> topics = new ArrayList<>(endOffsetsByTopic.size());
         for (Map.Entry<String, Map<Integer, EpochEndOffset>> topicToPartitionEpochs
: endOffsetsByTopic.entrySet()) {
             Struct topicStruct = responseStruct.instance(TOPICS);
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 05b9926..e34348a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1053,7 +1053,7 @@ public class RequestResponseTest {
         epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1, 1));
         epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 1, 2));
 
-        return new OffsetsForLeaderEpochResponse(epochs);
+        return new OffsetsForLeaderEpochResponse(0, epochs);
     }
 
     private AddPartitionsToTxnRequest createAddPartitionsToTxnRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9f32b94..5a82cc4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2042,7 +2042,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     authorizeClusterAction(request)
 
     val lastOffsetForLeaderEpoch = replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
-    sendResponseExemptThrottle(request, new OffsetsForLeaderEpochResponse(lastOffsetForLeaderEpoch))
+    sendResponseMaybeThrottle(request, requestThrottleMs =>
+      new OffsetsForLeaderEpochResponse(requestThrottleMs, lastOffsetForLeaderEpoch))
   }
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3b7ecfb..7df1bd6 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -450,6 +450,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
+      case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }


Mime
View raw message