kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9840; Skip End Offset validation when the leader epoch is not reliable (#8486)
Date Fri, 05 Jun 2020 23:04:21 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 0685ba6  KAFKA-9840; Skip End Offset validation when the leader epoch is not reliable (#8486)
0685ba6 is described below

commit 0685ba69f5742e8600c03e2c823d9345dbbd2fb4
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Fri Jun 5 15:53:13 2020 -0700

    KAFKA-9840; Skip End Offset validation when the leader epoch is not reliable (#8486)
    
    This PR provides two fixes:
    1. Skip offset validation if the current leader epoch cannot be reliably determined.
    2. Raise an out of range error if the leader returns an undefined offset in response to the OffsetsForLeaderEpoch request.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../java/org/apache/kafka/clients/Metadata.java    |   6 +-
 .../clients/consumer/LogTruncationException.java   |   3 +-
 .../consumer/OffsetOutOfRangeException.java        |   7 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 117 ++++++++----
 .../internals/OffsetsForLeaderEpochClient.java     |   4 -
 .../consumer/internals/SubscriptionState.java      |  17 +-
 .../kafka/common/requests/EpochEndOffset.java      |   9 +-
 .../kafka/common/requests/MetadataResponse.java    |  40 +++--
 .../org/apache/kafka/clients/MetadataTest.java     |  33 ++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 112 +++++-------
 .../internals/AbstractCoordinatorTest.java         |   2 +-
 .../clients/consumer/internals/FetcherTest.java    | 200 +++++++++++++++++----
 .../consumer/internals/SubscriptionStateTest.java  |  11 +-
 .../kafka/common/requests/EpochEndOffsetTest.java  |  64 +++++++
 .../test/java/org/apache/kafka/test/TestUtils.java |  30 +++-
 15 files changed, 462 insertions(+), 193 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index cedbe15..1dc1f39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -234,7 +234,9 @@ public class Metadata implements Closeable {
     }
 
     /**
-     * Update metadata assuming the current request version. This is mainly for convenience in testing.
+     * Update metadata assuming the current request version.
+     *
+     * For testing only.
      */
     public synchronized void updateWithCurrentRequestVersion(MetadataResponse response, boolean isPartialUpdate, long nowMs) {
         this.update(this.requestVersion, response, isPartialUpdate, nowMs);
@@ -401,7 +403,7 @@ public class Metadata implements Closeable {
      * the producer to abort waiting for metadata if there were fatal exceptions (e.g. authentication failures)
      * in the last metadata update.
      */
-    public synchronized void maybeThrowFatalException() {
+    protected synchronized void maybeThrowFatalException() {
         KafkaException metadataException = this.fatalException;
         if (metadataException != null) {
             fatalException = null;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
index f8af50d..60d2449 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
@@ -36,7 +36,8 @@ public class LogTruncationException extends OffsetOutOfRangeException {
     private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;
 
     public LogTruncationException(Map<TopicPartition, OffsetAndMetadata> divergentOffsets) {
-        super(Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset));
+        super("Detected log truncation with diverging offsets " + divergentOffsets,
+            Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset));
         this.divergentOffsets = Collections.unmodifiableMap(divergentOffsets);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
index dae19b2..6b66293 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
@@ -31,7 +31,12 @@ public class OffsetOutOfRangeException extends InvalidOffsetException {
     private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
 
     public OffsetOutOfRangeException(Map<TopicPartition, Long> offsetOutOfRangePartitions) {
-        super("Offsets out of range with no configured reset policy for partitions: " + offsetOutOfRangePartitions);
+        this("Offsets out of range with no configured reset policy for partitions: " +
+            offsetOutOfRangePartitions, offsetOutOfRangePartitions);
+    }
+
+    public OffsetOutOfRangeException(String message, Map<TopicPartition, Long> offsetOutOfRangePartitions) {
+        super(message);
         this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c3f10ba..3b34f14 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -30,6 +30,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
@@ -476,7 +478,7 @@ public class Fetcher<K, V> implements Closeable {
     }
 
     /**
-     *  Validate offsets for all assigned partitions for which a leader change has been detected.
+     * Validate offsets for all assigned partitions for which a leader change has been detected.
      */
     public void validateOffsetsIfNeeded() {
         RuntimeException exception = cachedOffsetForLeaderException.getAndSet(null);
@@ -490,7 +492,7 @@ public class Fetcher<K, V> implements Closeable {
         });
 
         // Collect positions needing validation, with backoff
-        Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = subscriptions
+        Map<TopicPartition, FetchPosition> partitionsToValidate = subscriptions
                 .partitionsNeedingValidation(time.milliseconds())
                 .stream()
                 .collect(Collectors.toMap(Function.identity(), subscriptions::position));
@@ -672,7 +674,7 @@ public class Fetcher<K, V> implements Closeable {
             log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
                     completedFetch.partition);
         } else {
-            SubscriptionState.FetchPosition position = subscriptions.position(completedFetch.partition);
+            FetchPosition position = subscriptions.position(completedFetch.partition);
             if (completedFetch.nextFetchOffset == position.offset) {
                 List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
 
@@ -680,7 +682,7 @@ public class Fetcher<K, V> implements Closeable {
                         partRecords.size(), position, completedFetch.partition);
 
                 if (completedFetch.nextFetchOffset > position.offset) {
-                    SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
+                    FetchPosition nextPosition = new FetchPosition(
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
@@ -713,7 +715,7 @@ public class Fetcher<K, V> implements Closeable {
     }
 
     private void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
-        SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
+        FetchPosition position = new FetchPosition(
                 offsetData.offset, offsetData.leaderEpoch, metadata.currentLeader(partition));
         offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch));
         subscriptions.maybeSeekUnvalidated(partition, position.offset, requestedResetStrategy);
@@ -770,11 +772,11 @@ public class Fetcher<K, V> implements Closeable {
      *
      * Requests are grouped by Node for efficiency.
      */
-    private void validateOffsetsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) {
-        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped =
-                regroupFetchPositionsByLeader(partitionsToValidate);
+    private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsToValidate) {
+        final Map<Node, Map<TopicPartition, FetchPosition>> regrouped =
+            regroupFetchPositionsByLeader(partitionsToValidate);
 
-        regrouped.forEach((node, fetchPostitions) -> {
+        regrouped.forEach((node, fetchPositions) -> {
             if (node.isEmpty()) {
                 metadata.requestUpdate();
                 return;
@@ -788,21 +790,22 @@ public class Fetcher<K, V> implements Closeable {
 
             if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                 log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " +
-                                "support the required protocol version (introduced in Kafka 2.3)",
-                        fetchPostitions.keySet());
-                for (TopicPartition partition : fetchPostitions.keySet()) {
+                              "support the required protocol version (introduced in Kafka 2.3)",
+                    fetchPositions.keySet());
+                for (TopicPartition partition : fetchPositions.keySet()) {
                     subscriptions.completeValidation(partition);
                 }
                 return;
             }
 
-            subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs);
+            subscriptions.setNextAllowedRetry(fetchPositions.keySet(), time.milliseconds() + requestTimeoutMs);
 
-            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
-                offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
-            future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
+            RequestFuture<OffsetForEpochResult> future =
+                offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions);
+
+            future.addListener(new RequestFutureListener<OffsetForEpochResult>() {
                 @Override
-                public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
+                public void onSuccess(OffsetForEpochResult offsetsResult) {
                     Map<TopicPartition, OffsetAndMetadata> truncationWithoutResetPolicy = new HashMap<>();
                     if (!offsetsResult.partitionsToRetry().isEmpty()) {
                         subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs);
@@ -812,33 +815,55 @@ public class Fetcher<K, V> implements Closeable {
                     // For each OffsetsForLeader response, check if the end-offset is lower than our current offset
                     // for the partition. If so, it means we have experienced log truncation and need to reposition
                     // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and epoch are valid. If not, then we should reset
+                    // its offset if reset policy is configured, or throw out of range exception.
                     offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = fetchPostitions.get(respTopicPartition);
-                        Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation(
+                        FetchPosition requestPosition = fetchPositions.get(respTopicPartition);
+
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            try {
+                                handleOffsetOutOfRange(requestPosition, respTopicPartition,
+                                    "Failed leader offset epoch validation for " + requestPosition
+                                        + " since no end offset larger than current fetch epoch was reported");
+                            } catch (OffsetOutOfRangeException e) {
+                                // Catch the exception here to ensure finishing other partitions validation.
+                                setFatalOffsetForLeaderException(e);
+                            }
+                        } else {
+                            Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation(
                                 respTopicPartition, requestPosition, respEndOffset);
-                        divergentOffsetOpt.ifPresent(divergentOffset -> {
-                            truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset);
-                        });
+                            divergentOffsetOpt.ifPresent(
+                                divergentOffset -> {
+                                    log.info("Detected log truncation for topic partition {} with diverging offset {}",
+                                        respTopicPartition, divergentOffset);
+                                    truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset);
+                                });
+                        }
                     });
 
                     if (!truncationWithoutResetPolicy.isEmpty()) {
-                        throw new LogTruncationException(truncationWithoutResetPolicy);
+                        setFatalOffsetForLeaderException(new LogTruncationException(truncationWithoutResetPolicy));
                     }
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
-                    subscriptions.requestFailed(fetchPostitions.keySet(), time.milliseconds() + retryBackoffMs);
+                    subscriptions.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs);
                     metadata.requestUpdate();
 
-                    if (!(e instanceof RetriableException) && !cachedOffsetForLeaderException.compareAndSet(null, e)) {
-                        log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", e);
-                    }
+                    setFatalOffsetForLeaderException(e);
                 }
             });
         });
     }
 
+    private void setFatalOffsetForLeaderException(RuntimeException e) {
+        if (!(e instanceof RetriableException) && !cachedOffsetForLeaderException.compareAndSet(null, e)) {
+            log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", e);
+        }
+    }
+
     /**
      * Search the offsets by target times for the specified partitions.
      *
@@ -966,7 +991,6 @@ public class Fetcher<K, V> implements Closeable {
      *               value of each partition may be null only for v0. In v1 and later the ListOffset API would not
      *               return a null timestamp (-1 is returned instead when necessary).
      */
-    @SuppressWarnings("deprecation")
     private void handleListOffsetResponse(Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch,
                                           ListOffsetResponse listOffsetResponse,
                                           RequestFuture<ListOffsetResult> future) {
@@ -1051,12 +1075,12 @@ public class Fetcher<K, V> implements Closeable {
         private final Map<TopicPartition, ListOffsetData> fetchedOffsets;
         private final Set<TopicPartition> partitionsToRetry;
 
-        public ListOffsetResult(Map<TopicPartition, ListOffsetData> fetchedOffsets, Set<TopicPartition> partitionsNeedingRetry) {
+        ListOffsetResult(Map<TopicPartition, ListOffsetData> fetchedOffsets, Set<TopicPartition> partitionsNeedingRetry) {
             this.fetchedOffsets = fetchedOffsets;
             this.partitionsToRetry = partitionsNeedingRetry;
         }
 
-        public ListOffsetResult() {
+        ListOffsetResult() {
             this.fetchedOffsets = new HashMap<>();
             this.partitionsToRetry = new HashSet<>();
         }
@@ -1109,7 +1133,7 @@ public class Fetcher<K, V> implements Closeable {
 
         for (TopicPartition partition : fetchablePartitions()) {
             // Use the preferred read replica if set, or the position's leader
-            SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
+            FetchPosition position = this.subscriptions.position(partition);
             Optional<Node> leaderOpt = position.currentLeader.leader;
             if (!leaderOpt.isPresent()) {
                 metadata.requestUpdate();
@@ -1155,8 +1179,8 @@ public class Fetcher<K, V> implements Closeable {
         return reqs;
     }
 
-    private Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader(
-            Map<TopicPartition, SubscriptionState.FetchPosition> partitionMap) {
+    private Map<Node, Map<TopicPartition, FetchPosition>> regroupFetchPositionsByLeader(
+            Map<TopicPartition, FetchPosition> partitionMap) {
         return partitionMap.entrySet()
                 .stream()
                 .filter(entry -> entry.getValue().currentLeader.leader.isPresent())
@@ -1188,7 +1212,7 @@ public class Fetcher<K, V> implements Closeable {
             } else if (error == Errors.NONE) {
                 // we are interested in this fetch only if the beginning offset matches the
                 // current consumed position
-                SubscriptionState.FetchPosition position = subscriptions.position(tp);
+                FetchPosition position = subscriptions.position(tp);
                 if (position == null || position.offset != fetchOffset) {
                     log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
                             "the expected offset {}", tp, fetchOffset, position);
@@ -1260,11 +1284,8 @@ public class Fetcher<K, V> implements Closeable {
                     if (fetchOffset != subscriptions.position(tp).offset) {
                         log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
                                 "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
-                    } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
-                        log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
-                        subscriptions.requestOffsetReset(tp);
                     } else {
-                        throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
+                        handleOffsetOutOfRange(subscriptions.position(tp), tp, "error response in offset fetch");
                     }
                 } else {
                     log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
@@ -1304,6 +1325,26 @@ public class Fetcher<K, V> implements Closeable {
         return completedFetch;
     }
 
+    private void handleOffsetOutOfRange(FetchPosition fetchPosition,
+                                        TopicPartition topicPartition,
+                                        String reason) {
+        if (subscriptions.hasDefaultOffsetResetPolicy()) {
+            log.info("Fetch offset epoch {} is out of range for partition {}, resetting offset",
+                fetchPosition, topicPartition);
+            subscriptions.requestOffsetReset(topicPartition);
+        } else {
+            Map<TopicPartition, Long> offsetOutOfRangePartitions =
+                Collections.singletonMap(topicPartition, fetchPosition.offset);
+            String errorMessage = String.format("Offsets out of range " +
+                "with no configured reset policy for partitions: %s" +
+                ", for fetch offset: %d, " +
+                "root cause: %s",
+                offsetOutOfRangePartitions, fetchPosition.offset, reason);
+            log.info(errorMessage);
+            throw new OffsetOutOfRangeException(errorMessage, offsetOutOfRangePartitions);
+        }
+    }
+
     /**
      * Parse the record entry, deserializing the key / value fields if necessary
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 7e372c7..b05e01f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -85,10 +85,6 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
                 case KAFKA_STORAGE_ERROR:
                 case OFFSET_NOT_AVAILABLE:
                 case LEADER_NOT_AVAILABLE:
-                    logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
-                            topicPartition, error);
-                    partitionsToRetry.add(topicPartition);
-                    break;
                 case FENCED_LEADER_EPOCH:
                 case UNKNOWN_LEADER_EPOCH:
                     logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 86f4e68..0d64942 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -440,12 +440,13 @@ public class SubscriptionState {
      * Enter the offset validation state if the leader for this partition is known to support a usable version of the
      * OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete the offset validation.
      *
-     * @param apiVersions
-     * @param tp
-     * @param leaderAndEpoch
+     * @param apiVersions supported API versions
+     * @param tp topic partition to validate
+     * @param leaderAndEpoch leader epoch of the topic partition
      * @return true if we enter the offset validation state
      */
-    public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition tp,
+    public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions,
+                                                                      TopicPartition tp,
                                                                       Metadata.LeaderAndEpoch leaderAndEpoch) {
         if (leaderAndEpoch.leader.isPresent()) {
             NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString());
@@ -786,7 +787,7 @@ public class SubscriptionState {
                 return false;
             }
 
-            if (!currentLeaderAndEpoch.leader.isPresent() && !currentLeaderAndEpoch.epoch.isPresent()) {
+            if (!currentLeaderAndEpoch.leader.isPresent()) {
                 return false;
             }
 
@@ -818,9 +819,7 @@ public class SubscriptionState {
          */
         private void completeValidation() {
             if (hasPosition()) {
-                transitionState(FetchStates.FETCHING, () -> {
-                    this.nextRetryTimeMs = null;
-                });
+                transitionState(FetchStates.FETCHING, () -> this.nextRetryTimeMs = null);
             }
         }
 
@@ -1011,8 +1010,6 @@ public class SubscriptionState {
      *
      * This includes the offset and epoch from the last record in
      * the batch from a FetchResponse. It also includes the leader epoch at the time the batch was consumed.
-     *
-     * The last fetch epoch is used to
      */
     public static class FetchPosition {
         public final long offset;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
index 06dfef9..47161b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
@@ -40,9 +40,7 @@ public class EpochEndOffset {
     }
 
     public EpochEndOffset(int leaderEpoch, long endOffset) {
-        this.error = Errors.NONE;
-        this.leaderEpoch = leaderEpoch;
-        this.endOffset = endOffset;
+        this(Errors.NONE, leaderEpoch, endOffset);
     }
 
     public Errors error() {
@@ -86,4 +84,9 @@ public class EpochEndOffset {
     public int hashCode() {
         return Objects.hash(error, leaderEpoch, endOffset);
     }
+
+    public boolean hasUndefinedEpochOrOffset() {
+        return this.endOffset == UNDEFINED_EPOCH_OFFSET ||
+            this.leaderEpoch == UNDEFINED_EPOCH;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 465346b..8aefc67 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -371,7 +371,7 @@ public class MetadataResponse extends AbstractResponse {
         @Override
         public String toString() {
             return "PartitionMetadata(" +
-                    ", error=" + error +
+                    "error=" + error +
                     ", partition=" + topicPartition +
                     ", leader=" + leaderId +
                     ", leaderEpoch=" + leaderEpoch +
@@ -429,7 +429,8 @@ public class MetadataResponse extends AbstractResponse {
 
     public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
                                                    int controllerId, List<TopicMetadata> topicMetadataList,
-                                                   int clusterAuthorizedOperations) {
+                                                   int clusterAuthorizedOperations,
+                                                   short responseVersion) {
         MetadataResponseData responseData = new MetadataResponseData();
         responseData.setThrottleTimeMs(throttleTimeMs);
         brokers.forEach(broker ->
@@ -464,22 +465,41 @@ public class MetadataResponse extends AbstractResponse {
             }
             responseData.topics().add(metadataResponseTopic);
         });
-        return new MetadataResponse(responseData);
+        return new MetadataResponse(responseData.toStruct(responseVersion), responseVersion);
     }
 
-    public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
-                                                   int controllerId, List<TopicMetadata> topicMetadataList) {
+    public static MetadataResponse prepareResponse(int throttleTimeMs,
+                                                   Collection<Node> brokers,
+                                                   String clusterId,
+                                                   int controllerId,
+                                                   List<TopicMetadata> topicMetadataList,
+                                                   short responseVersion) {
         return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList,
-                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, responseVersion);
+    }
+
+    public static MetadataResponse prepareResponse(Collection<Node> brokers,
+                                                   String clusterId,
+                                                   int controllerId,
+                                                   List<TopicMetadata> topicMetadata,
+                                                   short responseVersion) {
+        return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId,
+            topicMetadata, responseVersion);
     }
 
-    public static MetadataResponse prepareResponse(Collection<Node> brokers, String clusterId, int controllerId,
+    public static MetadataResponse prepareResponse(Collection<Node> brokers,
+                                                   String clusterId,
+                                                   int controllerId,
                                                    List<TopicMetadata> topicMetadata) {
-        return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
+        return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId,
+            topicMetadata, ApiKeys.METADATA.latestVersion());
     }
 
-    public static MetadataResponse prepareResponse(int throttleTimeMs, List<MetadataResponseTopic> topicMetadataList,
-                                                   Collection<Node> brokers, String clusterId, int controllerId,
+    public static MetadataResponse prepareResponse(int throttleTimeMs,
+                                                   List<MetadataResponseTopic> topicMetadataList,
+                                                   Collection<Node> brokers,
+                                                   String clusterId,
+                                                   int controllerId,
                                                    int clusterAuthorizedOperations) {
         MetadataResponseData responseData = new MetadataResponseData();
         responseData.setThrottleTimeMs(throttleTimeMs);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 96cd22c..ba965db 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
@@ -141,7 +142,7 @@ public class MetadataTest {
     }
 
     @Test
-    public void testTimeToNextUpdate_RetryBackoff() {
+    public void testTimeToNextUpdateRetryBackoff() {
         long now = 10000;
 
         // lastRefreshMs updated to now.
@@ -200,8 +201,8 @@ public class MetadataTest {
             assertFalse(response.hasReliableLeaderEpochs());
             metadata.updateWithCurrentRequestVersion(response, false, 100);
             assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
-            MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
-            assertEquals(Optional.empty(), metadata.leaderEpoch);
+            MetadataResponse.PartitionMetadata responseMetadata = this.metadata.partitionMetadataIfCurrent(tp).get();
+            assertEquals(Optional.empty(), responseMetadata.leaderEpoch);
         }
 
         for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
@@ -210,8 +211,8 @@ public class MetadataTest {
             assertTrue(response.hasReliableLeaderEpochs());
             metadata.updateWithCurrentRequestVersion(response, false, 100);
             assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
-            MetadataResponse.PartitionMetadata info = metadata.partitionMetadataIfCurrent(tp).get();
-            assertEquals(Optional.of(10), info.leaderEpoch);
+            MetadataResponse.PartitionMetadata responseMetadata = metadata.partitionMetadataIfCurrent(tp).get();
+            assertEquals(Optional.of(10), responseMetadata.leaderEpoch);
         }
     }
 
@@ -259,10 +260,10 @@ public class MetadataTest {
         assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
 
         assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
-        MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
+        MetadataResponse.PartitionMetadata responseMetadata = this.metadata.partitionMetadataIfCurrent(tp).get();
 
-        assertEquals(Arrays.asList(1, 2, 3), metadata.inSyncReplicaIds);
-        assertEquals(Optional.of(10), metadata.leaderEpoch);
+        assertEquals(Arrays.asList(1, 2, 3), responseMetadata.inSyncReplicaIds);
+        assertEquals(Optional.of(10), responseMetadata.leaderEpoch);
     }
 
     @Test
@@ -382,6 +383,7 @@ public class MetadataTest {
             MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100);
             metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
             assertNotNull(metadata.fetch().partition(tp));
+            assertTrue(metadata.lastSeenLeaderEpoch(tp).isPresent());
             assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
         }
 
@@ -389,7 +391,8 @@ public class MetadataTest {
         {
             MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 99,
                 (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
-                        new MetadataResponse.PartitionMetadata(error, partition, leader, leaderEpoch, replicas, Collections.emptyList(), offlineReplicas));
+                        new MetadataResponse.PartitionMetadata(error, partition, leader,
+                            leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion());
             metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
             assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 1);
             assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
@@ -399,7 +402,8 @@ public class MetadataTest {
         {
             MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100,
                 (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
-                        new MetadataResponse.PartitionMetadata(error, partition, leader, leaderEpoch, replicas, Collections.emptyList(), offlineReplicas));
+                        new MetadataResponse.PartitionMetadata(error, partition, leader,
+                            leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion());
             metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
             assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 0);
             assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
@@ -436,6 +440,7 @@ public class MetadataTest {
         MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
         assertNotNull(metadata.fetch().partition(tp));
+        assertTrue(metadata.lastSeenLeaderEpoch(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
 
         // Simulate a leader epoch from another response, like a fetch response or list offsets
@@ -443,14 +448,14 @@ public class MetadataTest {
 
         // Cache of partition stays, but current partition info is not available since it's stale
         assertNotNull(metadata.fetch().partition(tp));
-        assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
+        assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5);
         assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
 
         // Metadata with older epoch is rejected, metadata state is unchanged
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
         assertNotNull(metadata.fetch().partition(tp));
-        assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
+        assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5);
         assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
 
@@ -458,7 +463,7 @@ public class MetadataTest {
         metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 101);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 30L);
         assertNotNull(metadata.fetch().partition(tp));
-        assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
+        assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5);
         assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
     }
@@ -706,7 +711,7 @@ public class MetadataTest {
             (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
                 new MetadataResponse.PartitionMetadata(error, partition, Optional.of(node0.id()), leaderEpoch,
                     Collections.singletonList(node0.id()), Collections.emptyList(),
-                        Collections.singletonList(node1.id())));
+                        Collections.singletonList(node1.id())), ApiKeys.METADATA.latestVersion());
         metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 022c688..e2ec870 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -243,7 +243,7 @@ public class KafkaConsumerTest {
         assertEquals(singleton(topic), consumer.subscription());
         assertTrue(consumer.assignment().isEmpty());
 
-        consumer.subscribe(Collections.<String>emptyList());
+        consumer.subscribe(Collections.emptyList());
         assertTrue(consumer.subscription().isEmpty());
         assertTrue(consumer.assignment().isEmpty());
 
@@ -323,7 +323,7 @@ public class KafkaConsumerTest {
     @Test
     public void testAssignOnEmptyTopicPartition() {
         try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
-            consumer.assign(Collections.<TopicPartition>emptyList());
+            consumer.assign(Collections.emptyList());
             assertTrue(consumer.subscription().isEmpty());
             assertTrue(consumer.assignment().isEmpty());
         }
@@ -344,7 +344,7 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void testInterceptorConstructorClose() throws Exception {
+    public void testInterceptorConstructorClose() {
         try {
             Properties props = new Properties();
             // test with client ID assigned by KafkaConsumer
@@ -416,8 +416,8 @@ public class KafkaConsumerTest {
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         if (groupId != null)
             props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        if (enableAutoCommit.isPresent())
-            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit.get().toString());
+        enableAutoCommit.ifPresent(
+            autoCommit -> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString()));
         return newConsumer(props);
     }
 
@@ -585,26 +585,20 @@ public class KafkaConsumerTest {
         consumer.seekToBeginning(singleton(tp1));
 
         client.prepareResponse(
-                new MockClient.RequestMatcher() {
-                    @Override
-                    public boolean matches(AbstractRequest body) {
-                        ListOffsetRequest request = (ListOffsetRequest) body;
-                        Map<TopicPartition, ListOffsetRequest.PartitionData> timestamps = request.partitionTimestamps();
-                        return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP &&
-                                timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP;
-                    }
-                }, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
+            body -> {
+                ListOffsetRequest request = (ListOffsetRequest) body;
+                Map<TopicPartition, ListOffsetRequest.PartitionData> timestamps = request.partitionTimestamps();
+                return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP &&
+                        timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP;
+            }, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
                         Collections.singletonMap(tp1, Errors.NOT_LEADER_FOR_PARTITION)));
         client.prepareResponse(
-                new MockClient.RequestMatcher() {
-                    @Override
-                    public boolean matches(AbstractRequest body) {
-                        FetchRequest request = (FetchRequest) body;
-                        return request.fetchData().keySet().equals(singleton(tp0)) &&
-                                request.fetchData().get(tp0).fetchOffset == 50L;
+            body -> {
+                FetchRequest request = (FetchRequest) body;
+                return request.fetchData().keySet().equals(singleton(tp0)) &&
+                        request.fetchData().get(tp0).fetchOffset == 50L;
 
-                    }
-                }, fetchResponse(tp0, 50L, 5));
+            }, fetchResponse(tp0, 50L, 5));
 
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
         assertEquals(5, records.count());
@@ -613,6 +607,7 @@ public class KafkaConsumerTest {
 
     private void initMetadata(MockClient mockClient, Map<String, Integer> partitionCounts) {
         MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(1, partitionCounts);
+
         mockClient.updateMetadata(initialMetadata);
     }
 
@@ -954,11 +949,7 @@ public class KafkaConsumerTest {
 
         consumer.wakeup();
 
-        try {
-            consumer.poll(Duration.ZERO);
-            fail();
-        } catch (WakeupException e) {
-        }
+        assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
 
         // make sure the position hasn't been updated
         assertEquals(0, consumer.position(tp0));
@@ -1062,7 +1053,7 @@ public class KafkaConsumerTest {
         consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
 
         // verify that subscription has changed but assignment is still unchanged
-        assertTrue(consumer.subscription().size() == 2);
+        assertEquals(2, consumer.subscription().size());
         assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic2));
         assertTrue(consumer.assignment().isEmpty());
 
@@ -1102,9 +1093,9 @@ public class KafkaConsumerTest {
         consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer));
 
         // verify that subscription has changed but assignment is still unchanged
-        assertTrue(consumer.subscription().size() == 2);
+        assertEquals(2, consumer.subscription().size());
         assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic3));
-        assertTrue(consumer.assignment().size() == 2);
+        assertEquals(2, consumer.assignment().size());
         assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t2p0));
 
         // mock the offset commit response for to be revoked partitions
@@ -1133,9 +1124,9 @@ public class KafkaConsumerTest {
         assertTrue(commitReceived.get());
 
         // verify that subscription is still the same, and now assignment has caught up
-        assertTrue(consumer.subscription().size() == 2);
+        assertEquals(2, consumer.subscription().size());
         assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic3));
-        assertTrue(consumer.assignment().size() == 2);
+        assertEquals(2, consumer.assignment().size());
         assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t3p0));
 
         consumer.unsubscribe();
@@ -1307,7 +1298,7 @@ public class KafkaConsumerTest {
         assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
 
         // verify that assignment immediately changes
-        assertTrue(consumer.assignment().equals(singleton(tp0)));
+        assertEquals(consumer.assignment(), singleton(tp0));
 
         // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
         // we just lookup the starting position and send the record fetch.
@@ -1315,6 +1306,7 @@ public class KafkaConsumerTest {
         client.prepareResponse(fetchResponse(tp0, 10L, 1));
 
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
+
         assertEquals(1, records.count());
         assertEquals(11L, consumer.position(tp0));
 
@@ -1325,7 +1317,7 @@ public class KafkaConsumerTest {
         consumer.assign(singleton(t2p0));
 
         // verify that assignment immediately changes
-        assertTrue(consumer.assignment().equals(singleton(t2p0)));
+        assertEquals(consumer.assignment(), singleton(t2p0));
         // verify that the offset commits occurred as expected
         assertTrue(commitReceived.get());
 
@@ -1365,7 +1357,7 @@ public class KafkaConsumerTest {
         assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
 
         // verify that assignment immediately changes
-        assertTrue(consumer.assignment().equals(singleton(tp0)));
+        assertEquals(consumer.assignment(), singleton(tp0));
 
         // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
         // we just lookup the starting position and send the record fetch.
@@ -1380,11 +1372,11 @@ public class KafkaConsumerTest {
         consumer.assign(singleton(t2p0));
 
         // verify that assignment immediately changes
-        assertTrue(consumer.assignment().equals(singleton(t2p0)));
+        assertEquals(consumer.assignment(), singleton(t2p0));
 
         // the auto commit is disabled, so no offset commit request should be sent
         for (ClientRequest req : client.requests())
-            assertTrue(req.requestBuilder().apiKey() != ApiKeys.OFFSET_COMMIT);
+            assertNotSame(req.requestBuilder().apiKey(), ApiKeys.OFFSET_COMMIT);
 
         client.requests().clear();
         consumer.close();
@@ -1453,7 +1445,7 @@ public class KafkaConsumerTest {
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptySubscription() {
         try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
-            consumer.subscribe(Collections.<String>emptyList());
+            consumer.subscribe(Collections.emptyList());
             consumer.poll(Duration.ZERO);
         }
     }
@@ -1461,7 +1453,7 @@ public class KafkaConsumerTest {
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptyUserAssignment() {
         try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
-            consumer.assign(Collections.<TopicPartition>emptySet());
+            consumer.assign(Collections.emptySet());
             consumer.poll(Duration.ZERO);
         }
     }
@@ -1477,7 +1469,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCloseTimeout() throws Exception {
-        consumerCloseTest(5000, Collections.<AbstractResponse>emptyList(), 5000, false);
+        consumerCloseTest(5000, Collections.emptyList(), 5000, false);
     }
 
     @Test
@@ -1490,7 +1482,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCloseNoWait() throws Exception {
-        consumerCloseTest(0, Collections.<AbstractResponse>emptyList(), 0, false);
+        consumerCloseTest(0, Collections.emptyList(), 0, false);
     }
 
     @Test
@@ -1688,15 +1680,12 @@ public class KafkaConsumerTest {
         ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicReference<Exception> closeException = new AtomicReference<>();
         try {
-            Future<?> future = executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    consumer.commitAsync();
-                    try {
-                        consumer.close(Duration.ofMillis(closeTimeoutMs));
-                    } catch (Exception e) {
-                        closeException.set(e);
-                    }
+            Future<?> future = executor.submit(() -> {
+                consumer.commitAsync();
+                try {
+                    consumer.close(Duration.ofMillis(closeTimeoutMs));
+                } catch (Exception e) {
+                    closeException.set(e);
                 }
             });
 
@@ -2095,21 +2084,18 @@ public class KafkaConsumerTest {
         for (TopicPartition partition : partitionOffsets.keySet())
             response.put(partition, Errors.NONE);
 
-        client.prepareResponseFrom(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
-                Map<TopicPartition, Long> commitErrors = commitRequest.offsets();
-
-                for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
-                    // verify that the expected offset has been committed
-                    if (!commitErrors.get(partitionOffset.getKey()).equals(partitionOffset.getValue())) {
-                        commitReceived.set(false);
-                        return false;
-                    }
+        client.prepareResponseFrom(body -> {
+            OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
+            Map<TopicPartition, Long> commitErrors = commitRequest.offsets();
+
+            for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
+                // verify that the expected offset has been committed
+                if (!commitErrors.get(partitionOffset.getKey()).equals(partitionOffset.getValue())) {
+                    commitReceived.set(false);
+                    return false;
                 }
-                return true;
             }
+            return true;
         }, offsetCommitResponse(response), coordinator);
         return commitReceived;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index b62483d..331a6f3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -611,7 +611,7 @@ public class AbstractCoordinatorTest {
 
         final AbstractCoordinator.Generation currGen = coordinator.generation();
 
-        // let the heartbeat request to send out a request
+        // let the heartbeat thread send out a request
         mockTime.sleep(HEARTBEAT_INTERVAL_MS);
 
         TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 2000,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index c36b823..2010965 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.LogTruncationException;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -66,7 +67,6 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.EpochEndOffset;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -149,7 +149,9 @@ public class FetcherTest {
     private TopicPartition tp1 = new TopicPartition(topicName, 1);
     private TopicPartition tp2 = new TopicPartition(topicName, 2);
     private TopicPartition tp3 = new TopicPartition(topicName, 3);
-    private MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+        TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
 
     private int minBytes = 1;
     private int maxBytes = Integer.MAX_VALUE;
@@ -185,6 +187,11 @@ public class FetcherTest {
     private void assignFromUser(Set<TopicPartition> partitions) {
         subscriptions.assignFromUser(partitions);
         client.updateMetadata(initialUpdateResponse);
+
+        // A dummy metadata update to ensure valid leader epoch.
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+            Collections.emptyMap(), singletonMap(topicName, 4),
+            tp -> validLeaderEpoch), false, 0L);
     }
 
     @After
@@ -337,7 +344,6 @@ public class FetcherTest {
 
         assignFromUser(singleton(tp0));
         subscriptions.seek(tp0, 0);
-        client.updateMetadata(initialUpdateResponse);
         Node node = initialUpdateResponse.brokers().iterator().next();
 
         client.blackout(node, 500);
@@ -855,7 +861,8 @@ public class FetcherTest {
         subscriptions.assignFromSubscribed(singleton(tp0));
         subscriptions.seek(tp0, 0);
 
-        client.updateMetadata(initialUpdateResponse);
+        client.updateMetadata(TestUtils.metadataUpdateWith(
+            1, singletonMap(topicName, 4), tp -> validLeaderEpoch));
 
         assertEquals(1, fetcher.sendFetches());
 
@@ -878,7 +885,8 @@ public class FetcherTest {
         subscriptions.assignFromSubscribed(singleton(tp0));
         subscriptions.seek(tp0, 0);
 
-        client.updateMetadata(initialUpdateResponse);
+        client.updateMetadata(TestUtils.metadataUpdateWith(
+            1, singletonMap(topicName, 4), tp -> validLeaderEpoch));
 
         assertEquals(1, fetcher.sendFetches());
 
@@ -1289,7 +1297,6 @@ public class FetcherTest {
         buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
         assignFromUser(Utils.mkSet(tp0, tp1, tp2, tp3));
-        client.updateMetadata(initialUpdateResponse);
 
         subscriptions.seek(tp0, 1);
         subscriptions.seek(tp1, 1);
@@ -1425,8 +1432,8 @@ public class FetcherTest {
         assignFromUser(singleton(tp0));
         subscriptions.requestOffsetReset(tp0);
 
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, 1L, 5L));
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP,
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L));
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
@@ -1461,8 +1468,8 @@ public class FetcherTest {
         subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         // Fail with OFFSET_NOT_AVAILABLE
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
         assertFalse(subscriptions.hasValidPosition(tp0));
@@ -1471,8 +1478,8 @@ public class FetcherTest {
 
         // Fail with LEADER_NOT_AVAILABLE
         time.sleep(retryBackoffMs);
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
         assertFalse(subscriptions.hasValidPosition(tp0));
@@ -1553,8 +1560,8 @@ public class FetcherTest {
         assignFromUser(singleton(tp0));
         subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
 
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, 1L, 5L));
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP,
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L));
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
 
@@ -1570,8 +1577,8 @@ public class FetcherTest {
         subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         // First fetch fails with stale metadata
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                listOffsetResponse(Errors.NOT_LEADER_FOR_PARTITION, 1L, 5L), false);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NOT_LEADER_FOR_PARTITION, 1L, 5L), false);
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
         assertFalse(subscriptions.hasValidPosition(tp0));
@@ -1647,8 +1654,8 @@ public class FetcherTest {
         subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, 1L, 5L), true);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L), true);
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
         assertFalse(subscriptions.hasValidPosition(tp0));
@@ -1824,8 +1831,8 @@ public class FetcherTest {
         subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1, -1), false);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1, -1), false);
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
         assertFalse(subscriptions.hasValidPosition(tp0));
@@ -1862,8 +1869,8 @@ public class FetcherTest {
         subscriptions.pause(tp0); // paused partition does not have a valid position
         subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, 1L, 10L));
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 10L));
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
 
@@ -2222,7 +2229,7 @@ public class FetcherTest {
         Map<String, Integer> partitionCounts = new HashMap<>();
         partitionCounts.put(topic1, 1);
         partitionCounts.put(topic2, 1);
-        client.updateMetadata(TestUtils.metadataUpdateWith(1, partitionCounts));
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, partitionCounts, tp -> validLeaderEpoch));
 
         int expectedBytes = 0;
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> fetchPartitionData = new LinkedHashMap<>();
@@ -3285,7 +3292,7 @@ public class FetcherTest {
         };
 
         MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1,
-                singletonMap(topicName, numPartitions));
+                singletonMap(topicName, numPartitions), tp -> validLeaderEpoch);
         client.updateMetadata(initialMetadataResponse);
         fetchSize = 10000;
 
@@ -3637,12 +3644,9 @@ public class FetcherTest {
             ));
 
             OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(endOffsets);
-            client.prepareResponseFrom(new MockClient.RequestMatcher() {
-                @Override
-                public boolean matches(AbstractRequest body) {
-                    OffsetsForLeaderEpochRequest request = (OffsetsForLeaderEpochRequest) body;
-                    return expectedPartitions.equals(request.epochsByTopicPartition().keySet());
-                }
+            client.prepareResponseFrom(body -> {
+                OffsetsForLeaderEpochRequest request = (OffsetsForLeaderEpochRequest) body;
+                return expectedPartitions.equals(request.epochsByTopicPartition().keySet());
             }, response, node);
         }
 
@@ -3752,6 +3756,121 @@ public class FetcherTest {
     }
 
     @Test
+    public void testOffsetValidationSkippedForOldResponse() {
+        // Old responses may provide unreliable leader epoch,
+        // so we should skip offset validation and not send the request.
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+
+        final int epochOne = 1;
+
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+            Collections.emptyMap(), partitionCounts, tp -> epochOne), false, 0L);
+
+        Node node = metadata.fetch().nodes().get(0);
+        assertFalse(client.isConnected(node.idString()));
+
+        // Seek with a position and leader+epoch
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+            metadata.currentLeader(tp0).leader, Optional.of(epochOne));
+        subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
+        assertFalse(client.isConnected(node.idString()));
+        assertTrue(subscriptions.awaitingValidation(tp0));
+
+        // Inject an older version of the metadata response
+        final short responseVersion = 8;
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+            Collections.emptyMap(), partitionCounts, responseVersion), false, 0L);
+        fetcher.validateOffsetsIfNeeded();
+        // Offset validation is skipped
+        assertFalse(subscriptions.awaitingValidation(tp0));
+    }
+
+    @Test
+    public void testOffsetValidationResetOffsetForUndefinedEpochWithDefinedResetPolicy() {
+        testOffsetValidationWithGivenEpochOffset(
+            new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 0L), OffsetResetStrategy.EARLIEST);
+    }
+
+    @Test
+    public void testOffsetValidationResetOffsetForUndefinedOffsetWithDefinedResetPolicy() {
+        testOffsetValidationWithGivenEpochOffset(
+            new EpochEndOffset(2, EpochEndOffset.UNDEFINED_EPOCH_OFFSET), OffsetResetStrategy.EARLIEST);
+    }
+
+    @Test
+    public void testOffsetValidationResetOffsetForUndefinedEpochWithUndefinedResetPolicy() {
+        testOffsetValidationWithGivenEpochOffset(
+            new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 0L), OffsetResetStrategy.NONE);
+    }
+
+    @Test
+    public void testOffsetValidationResetOffsetForUndefinedOffsetWithUndefinedResetPolicy() {
+        testOffsetValidationWithGivenEpochOffset(
+            new EpochEndOffset(2, EpochEndOffset.UNDEFINED_EPOCH_OFFSET), OffsetResetStrategy.NONE);
+    }
+
+    @Test
+    public void testOffsetValidationTriggerLogTruncationForBadOffsetWithUndefinedResetPolicy() {
+        testOffsetValidationWithGivenEpochOffset(
+            new EpochEndOffset(1, 1L), OffsetResetStrategy.NONE);
+    }
+
+    private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epochEndOffset,
+                                                          OffsetResetStrategy offsetResetStrategy) {
+        buildFetcher(offsetResetStrategy);
+        assignFromUser(singleton(tp0));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+
+        final int epochOne = 1;
+        final long initialOffset = 5;
+
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+            Collections.emptyMap(), partitionCounts, tp -> epochOne), false, 0L);
+
+        // Offset validation requires OffsetForLeaderEpoch request v3 or higher
+        Node node = metadata.fetch().nodes().get(0);
+        apiVersions.update(node.idString(), NodeApiVersions.create());
+
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader, Optional.of(epochOne));
+        subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(initialOffset, Optional.of(epochOne), leaderAndEpoch));
+
+        fetcher.validateOffsetsIfNeeded();
+
+        consumerClient.poll(time.timer(Duration.ZERO));
+        assertTrue(subscriptions.awaitingValidation(tp0));
+        assertTrue(client.hasInFlightRequests());
+
+        client.respond(request -> {
+            OffsetsForLeaderEpochRequest epochRequest = (OffsetsForLeaderEpochRequest) request;
+            OffsetsForLeaderEpochRequest.PartitionData partitionData = epochRequest.epochsByTopicPartition().get(tp0);
+            return partitionData.currentLeaderEpoch.equals(Optional.of(epochOne)) && partitionData.leaderEpoch == epochOne;
+        }, new OffsetsForLeaderEpochResponse(singletonMap(tp0, epochEndOffset)));
+        consumerClient.poll(time.timer(Duration.ZERO));
+
+        assertEquals(initialOffset, subscriptions.position(tp0).offset);
+
+        if (offsetResetStrategy == OffsetResetStrategy.NONE) {
+            OffsetOutOfRangeException thrown =
+                assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded());
+
+            // If epoch offset is valid, we are testing for the log truncation case.
+            if (!epochEndOffset.hasUndefinedEpochOrOffset()) {
+                assertTrue(thrown instanceof LogTruncationException);
+            }
+            assertTrue(subscriptions.awaitingValidation(tp0));
+        } else {
+            fetcher.validateOffsetsIfNeeded();
+            assertFalse(subscriptions.awaitingValidation(tp0));
+        }
+    }
+
+    @Test
     public void testOffsetValidationHandlesSeekWithInflightOffsetForLeaderRequest() {
         buildFetcher();
         assignFromUser(singleton(tp0));
@@ -3924,7 +4043,7 @@ public class FetcherTest {
                 Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
 
         subscriptions.assignFromUser(singleton(tp0));
-        client.updateMetadata(TestUtils.metadataUpdateWith(2, singletonMap(topicName, 4)));
+        client.updateMetadata(TestUtils.metadataUpdateWith(2, singletonMap(topicName, 4), tp -> validLeaderEpoch));
         subscriptions.seek(tp0, 0);
 
         // Node preferred replica before first fetch response
@@ -3967,7 +4086,8 @@ public class FetcherTest {
                 Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
 
         subscriptions.assignFromUser(singleton(tp0));
-        client.updateMetadata(TestUtils.metadataUpdateWith(2, singletonMap(topicName, 4)));
+        client.updateMetadata(TestUtils.metadataUpdateWith(2, singletonMap(topicName, 4), tp -> validLeaderEpoch));
+
         subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
@@ -4047,9 +4167,7 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         // Trigger the exception.
-        assertThrows(KafkaException.class, () -> {
-            fetchedRecords();
-        });
+        assertThrows(KafkaException.class, this::fetchedRecords);
     }
 
     @Test
@@ -4121,11 +4239,15 @@ public class FetcherTest {
     }
 
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
+        return listOffsetRequestMatcher(timestamp, Optional.empty());
+    }
+
+    private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, Optional<Integer> leaderEpoch) {
         // matches any list offset request with the provided timestamp
         return body -> {
             ListOffsetRequest req = (ListOffsetRequest) body;
             return req.partitionTimestamps().equals(Collections.singletonMap(
-                tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.empty())));
+                tp0, new ListOffsetRequest.PartitionData(timestamp, leaderEpoch)));
         };
     }
 
@@ -4241,6 +4363,12 @@ public class FetcherTest {
                 Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
     }
 
+    private void buildFetcher(OffsetResetStrategy offsetResetStrategy) {
+        buildFetcher(new MetricConfig(), offsetResetStrategy,
+            new ByteArrayDeserializer(), new ByteArrayDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
+    }
+
     private <K, V> void buildFetcher(OffsetResetStrategy offsetResetStrategy,
                                      Deserializer<K> keyDeserializer,
                                      Deserializer<V> valueDeserializer,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index ae00b8e..7aeba97 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -589,17 +589,18 @@ public class SubscriptionStateTest {
         int initialOffsetEpoch = 5;
 
         SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
+            Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
         state.requestOffsetReset(tp0);
 
         Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition,
-                new EpochEndOffset(initialOffsetEpoch, initialOffset + 5));
+            new EpochEndOffset(initialOffsetEpoch, initialOffset + 5));
         assertEquals(Optional.empty(), divergentOffsetMetadataOpt);
         assertFalse(state.awaitingValidation(tp0));
         assertTrue(state.isOffsetResetNeeded(tp0));
+        assertEquals(initialPosition, state.position(tp0));
     }
 
     @Test
@@ -653,10 +654,10 @@ public class SubscriptionStateTest {
     }
 
     private static class MockRebalanceListener implements ConsumerRebalanceListener {
-        public Collection<TopicPartition> revoked;
+        Collection<TopicPartition> revoked;
         public Collection<TopicPartition> assigned;
-        public int revokedCount = 0;
-        public int assignedCount = 0;
+        int revokedCount = 0;
+        int assignedCount = 0;
 
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/EpochEndOffsetTest.java b/clients/src/test/java/org/apache/kafka/common/requests/EpochEndOffsetTest.java
new file mode 100644
index 0000000..3ab1f32
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/EpochEndOffsetTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import static org.apache.kafka.common.requests.EpochEndOffset.UNDEFINED_EPOCH;
+import static org.apache.kafka.common.requests.EpochEndOffset.UNDEFINED_EPOCH_OFFSET;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class EpochEndOffsetTest {
+
+    @Test
+    public void testConstructor() {
+        int leaderEpoch = 5;
+        long endOffset = 10L;
+        EpochEndOffset epochEndOffset = new EpochEndOffset(Errors.FENCED_LEADER_EPOCH, leaderEpoch, endOffset);
+
+        assertEquals(leaderEpoch, epochEndOffset.leaderEpoch());
+        assertEquals(endOffset, epochEndOffset.endOffset());
+        assertTrue(epochEndOffset.hasError());
+        assertEquals(Errors.FENCED_LEADER_EPOCH, epochEndOffset.error());
+        assertFalse(epochEndOffset.hasUndefinedEpochOrOffset());
+    }
+
+    @Test
+    public void testWithUndefinedEpoch() {
+        EpochEndOffset epochEndOffset = new EpochEndOffset(-1, 2L);
+
+        assertEquals(UNDEFINED_EPOCH, epochEndOffset.leaderEpoch());
+        assertEquals(2L, epochEndOffset.endOffset());
+        assertFalse(epochEndOffset.hasError());
+        assertEquals(Errors.NONE, epochEndOffset.error());
+        assertTrue(epochEndOffset.hasUndefinedEpochOrOffset());
+    }
+
+    @Test
+    public void testWithUndefinedEndOffset() {
+        EpochEndOffset epochEndOffset = new EpochEndOffset(3, -1L);
+
+        assertEquals(3, epochEndOffset.leaderEpoch());
+        assertEquals(UNDEFINED_EPOCH_OFFSET, epochEndOffset.endOffset());
+        assertFalse(epochEndOffset.hasError());
+        assertEquals(Errors.NONE, epochEndOffset.error());
+        assertTrue(epochEndOffset.hasUndefinedEpochOrOffset());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 23fd5ed..80cb16a 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -120,17 +120,35 @@ public class TestUtils {
         return metadataUpdateWith("kafka-cluster", numNodes, topicPartitionCounts);
     }
 
+    public static MetadataResponse metadataUpdateWith(final int numNodes,
+                                                      final Map<String, Integer> topicPartitionCounts,
+                                                      final Function<TopicPartition, Integer> epochSupplier) {
+        return metadataUpdateWith("kafka-cluster", numNodes, Collections.emptyMap(),
+            topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion());
+    }
+
     public static MetadataResponse metadataUpdateWith(final String clusterId,
                                                       final int numNodes,
                                                       final Map<String, Integer> topicPartitionCounts) {
-        return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(), topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new);
+        return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(),
+            topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion());
     }
 
     public static MetadataResponse metadataUpdateWith(final String clusterId,
                                                       final int numNodes,
                                                       final Map<String, Errors> topicErrors,
                                                       final Map<String, Integer> topicPartitionCounts) {
-        return metadataUpdateWith(clusterId, numNodes, topicErrors, topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new);
+        return metadataUpdateWith(clusterId, numNodes, topicErrors,
+            topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion());
+    }
+
+    public static MetadataResponse metadataUpdateWith(final String clusterId,
+                                                      final int numNodes,
+                                                      final Map<String, Errors> topicErrors,
+                                                      final Map<String, Integer> topicPartitionCounts,
+                                                      final short responseVersion) {
+        return metadataUpdateWith(clusterId, numNodes, topicErrors,
+            topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, responseVersion);
     }
 
     public static MetadataResponse metadataUpdateWith(final String clusterId,
@@ -138,7 +156,8 @@ public class TestUtils {
                                                       final Map<String, Errors> topicErrors,
                                                       final Map<String, Integer> topicPartitionCounts,
                                                       final Function<TopicPartition, Integer> epochSupplier) {
-        return metadataUpdateWith(clusterId, numNodes, topicErrors, topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new);
+        return metadataUpdateWith(clusterId, numNodes, topicErrors,
+            topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion());
     }
 
     public static MetadataResponse metadataUpdateWith(final String clusterId,
@@ -146,7 +165,8 @@ public class TestUtils {
                                                       final Map<String, Errors> topicErrors,
                                                       final Map<String, Integer> topicPartitionCounts,
                                                       final Function<TopicPartition, Integer> epochSupplier,
-                                                      final PartitionMetadataSupplier partitionSupplier) {
+                                                      final PartitionMetadataSupplier partitionSupplier,
+                                                      final short responseVersion) {
         final List<Node> nodes = new ArrayList<>(numNodes);
         for (int i = 0; i < numNodes; i++)
             nodes.add(new Node(i, "localhost", 1969 + i));
@@ -176,7 +196,7 @@ public class TestUtils {
                     Topic.isInternal(topic), Collections.emptyList()));
         }
 
-        return MetadataResponse.prepareResponse(nodes, clusterId, 0, topicMetadata);
+        return MetadataResponse.prepareResponse(nodes, clusterId, 0, topicMetadata, responseVersion);
     }
 
     @FunctionalInterface


Mime
View raw message