kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davidart...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9724 Newer clients not always sending fetch request to older brokers (#8376)
Date Wed, 27 May 2020 21:24:54 GMT
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 31dd984  KAFKA-9724 Newer clients not always sending fetch request to older brokers
(#8376)
31dd984 is described below

commit 31dd984cbe5bd14f52e488712b737887c691b7d6
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Wed May 27 17:24:17 2020 -0400

    KAFKA-9724 Newer clients not always sending fetch request to older brokers (#8376)
    
    Newer clients were getting stuck entering the validation phase even when a broker didn't
support it. This commit will bypass the AWAITING_VALIDATION state when the broker is on an
older version of the OffsetsForLeaderEpoch RPC.
---
 .../java/org/apache/kafka/clients/Metadata.java    |  55 +++++------
 .../kafka/clients/consumer/internals/Fetcher.java  |   9 +-
 .../consumer/internals/SubscriptionState.java      |  29 +++++-
 .../org/apache/kafka/clients/MetadataTest.java     |  73 ++++++++++-----
 .../clients/consumer/internals/FetcherTest.java    | 104 ++++++++++++++++++---
 .../consumer/internals/SubscriptionStateTest.java  |  46 ++++++++-
 6 files changed, 243 insertions(+), 73 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 7116e2e..d609c2a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -41,7 +41,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
@@ -156,14 +155,35 @@ public class Metadata implements Closeable {
     }
 
     /**
-     * Request an update for the partition metadata iff the given leader epoch is newer than
the last seen leader epoch
+     * Request an update for the partition metadata iff we have seen a newer leader epoch.
This is called by the client
+     * any time it handles a response from the broker that includes leader epoch, except
for UpdateMetadata which
+     * follows a different code path ({@link #update}).
+     *
+     * @param topicPartition
+     * @param leaderEpoch
+     * @return true if we updated the last seen epoch, false otherwise
      */
     public synchronized boolean updateLastSeenEpochIfNewer(TopicPartition topicPartition,
int leaderEpoch) {
         Objects.requireNonNull(topicPartition, "TopicPartition cannot be null");
         if (leaderEpoch < 0)
             throw new IllegalArgumentException("Invalid leader epoch " + leaderEpoch + "
(must be non-negative)");
 
-        boolean updated = updateLastSeenEpoch(topicPartition, leaderEpoch, oldEpoch ->
leaderEpoch > oldEpoch);
+        Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
+        log.trace("Determining if we should replace existing epoch {} with new epoch {} for
partition {}", oldEpoch, leaderEpoch, topicPartition);
+
+        final boolean updated;
+        if (oldEpoch == null) {
+            log.debug("Not replacing null epoch with new epoch {} for partition {}", leaderEpoch,
topicPartition);
+            updated = false;
+        } else if (leaderEpoch > oldEpoch) {
+            log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch,
leaderEpoch, topicPartition);
+            lastSeenLeaderEpochs.put(topicPartition, leaderEpoch);
+            updated = true;
+        } else {
+            log.debug("Not replacing existing epoch {} with new epoch {} for partition {}",
oldEpoch, leaderEpoch, topicPartition);
+            updated = false;
+        }
+
         this.needFullUpdate = this.needFullUpdate || updated;
         return updated;
     }
@@ -173,29 +193,6 @@ public class Metadata implements Closeable {
     }
 
     /**
-     * Conditionally update the leader epoch for a partition
-     *
-     * @param topicPartition       topic+partition to update the epoch for
-     * @param epoch                the new epoch
-     * @param epochTest            a predicate to determine if the old epoch should be replaced
-     * @return true if the epoch was updated, false otherwise
-     */
-    private synchronized boolean updateLastSeenEpoch(TopicPartition topicPartition,
-                                                     int epoch,
-                                                     Predicate<Integer> epochTest)
{
-        Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
-        log.trace("Determining if we should replace existing epoch {} with new epoch {}",
oldEpoch, epoch);
-        if (oldEpoch == null || epochTest.test(oldEpoch)) {
-            log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch,
epoch, topicPartition);
-            lastSeenLeaderEpochs.put(topicPartition, epoch);
-            return true;
-        } else {
-            log.debug("Not replacing existing epoch {} with new epoch {} for partition {}",
oldEpoch, epoch, topicPartition);
-            return false;
-        }
-    }
-
-    /**
      * Check whether an update has been explicitly requested.
      *
      * @return true if an update was requested, false otherwise
@@ -373,10 +370,14 @@ public class Metadata implements Closeable {
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent())
{
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update
the metadata
-            if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch))
{
+            Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
+            if (currentEpoch == null || newEpoch >= currentEpoch) {
+                log.debug("Updating last seen epoch for partition {} from {} to epoch {}
from new metadata", tp, currentEpoch, newEpoch);
+                lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
             } else {
                 // Otherwise ignore the new metadata and use the previously cached info
+                log.debug("Got metadata for an older epoch {} (current is {}) for partition
{}, not updating", newEpoch, currentEpoch, tp);
                 return cache.partitionMetadata(tp);
             }
         } else {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index bcfedcd..ea1b7e7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -486,7 +486,7 @@ public class Fetcher<K, V> implements Closeable {
         // Validate each partition against the current leader and epoch
         subscriptions.assignedPartitions().forEach(topicPartition -> {
             ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
-            subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch);
+            subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition,
leaderAndEpoch);
         });
 
         // Collect positions needing validation, with backoff
@@ -754,7 +754,7 @@ public class Fetcher<K, V> implements Closeable {
         }
     }
 
-    private boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions)
{
+    static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions)
{
         ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
         if (apiVersion == null)
             return false;
@@ -1099,8 +1099,9 @@ public class Fetcher<K, V> implements Closeable {
         Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();
 
         // Ensure the position has an up-to-date leader
-        subscriptions.assignedPartitions().forEach(
-            tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.currentLeader(tp)));
+        subscriptions.assignedPartitions().forEach(tp ->
+            subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp, metadata.currentLeader(tp))
+        );
 
         long currentTimeMs = time.milliseconds();
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6568c91..bd84777 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -45,6 +47,8 @@ import java.util.regex.Pattern;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.clients.consumer.internals.Fetcher.hasUsableOffsetForLeaderEpochVersion;
+
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer. A partition
  * is "assigned" either directly with {@link #assignFromUser(Set)} (manual assignment)
@@ -422,8 +426,29 @@ public class SubscriptionState {
         assignedState(tp).position(position);
     }
 
-    public synchronized boolean maybeValidatePositionForCurrentLeader(TopicPartition tp,
Metadata.LeaderAndEpoch leaderAndEpoch) {
-        return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+    /**
+     * Enter the offset validation state if the leader for this partition is known to support
a usable version of the
+     * OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete
the offset validation.
+     *
+     * @param apiVersions
+     * @param tp
+     * @param leaderAndEpoch
+     * @return true if we enter the offset validation state
+     */
+    public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions,
TopicPartition tp,
+                                                                      Metadata.LeaderAndEpoch
leaderAndEpoch) {
+        if (leaderAndEpoch.leader.isPresent()) {
+            NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString());
+            if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions))
{
+                return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+            } else {
+                // If the broker does not support a newer version of OffsetsForLeaderEpoch,
we skip validation
+                completeValidation(tp);
+                return false;
+            }
+        } else {
+            return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+        }
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5203745..96cd22c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -313,6 +313,10 @@ public class MetadataTest {
         boolean[] updateResult = {true, false, false, false, false, true, false, false, false,
true};
         TopicPartition tp = new TopicPartition("topic", 0);
 
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1,
+                Collections.emptyMap(), Collections.singletonMap("topic", 1), _tp -> 0);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
+
         for (int i = 0; i < epochs.length; i++) {
             metadata.updateLastSeenEpochIfNewer(tp, epochs[i]);
             if (updateResult[i]) {
@@ -326,6 +330,46 @@ public class MetadataTest {
     }
 
     @Test
+    public void testUpdateLastEpoch() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // if we have no leader epoch, this call shouldn't do anything
+        assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0));
+        assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
+        assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 2));
+        assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
+
+        // Metadata with newer epoch is handled
+        metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(),
Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(),
10));
+
+        // Don't update to an older one
+        assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(),
10));
+
+        // Don't cause update if it's the same one
+        assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 10));
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(),
10));
+
+        // Update if we see newer epoch
+        assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 12));
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(),
12));
+
+        metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(),
Collections.singletonMap("topic-1", 1), _tp -> 12);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(),
12));
+
+        // Don't overwrite metadata with older epoch
+        metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(),
Collections.singletonMap("topic-1", 1), _tp -> 11);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(),
12));
+    }
+
+    @Test
     public void testRejectOldMetadata() {
         Map<String, Integer> partitionCounts = new HashMap<>();
         partitionCounts.put("topic-1", 1);
@@ -379,26 +423,6 @@ public class MetadataTest {
     }
 
     @Test
-    public void testMaybeRequestUpdate() {
-        TopicPartition tp = new TopicPartition("topic-1", 0);
-        metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L);
-        assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 1));
-        assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
-
-        metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 1L);
-        assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
-        assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
-
-        metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 2L);
-        assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0));
-        assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
-
-        metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 3L);
-        assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 2));
-        assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 2);
-    }
-
-    @Test
     public void testOutOfBandEpochUpdate() {
         Map<String, Integer> partitionCounts = new HashMap<>();
         partitionCounts.put("topic-1", 5);
@@ -406,7 +430,7 @@ public class MetadataTest {
 
         metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L);
 
-        assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 99));
+        assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 99));
 
         // Update epoch to 100
         MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(),
partitionCounts, _tp -> 100);
@@ -414,7 +438,7 @@ public class MetadataTest {
         assertNotNull(metadata.fetch().partition(tp));
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
 
-        // Simulate a leader epoch from another response, like a fetch response (not yet
implemented)
+        // Simulate a leader epoch from another response, like a fetch response or list offsets
         assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 101));
 
         // Cache of partition stays, but current partition info is not available since it's
stale
@@ -454,6 +478,11 @@ public class MetadataTest {
         assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(0, metadata.partitionMetadataIfCurrent(tp).get().partition());
         assertEquals(Optional.of(0), metadata.partitionMetadataIfCurrent(tp).get().leaderId);
+
+        // Since epoch was null, this shouldn't update it
+        metadata.updateLastSeenEpochIfNewer(tp, 10);
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+        assertFalse(metadata.partitionMetadataIfCurrent(tp).get().leaderEpoch.isPresent());
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5a88750..d85df27 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1595,6 +1595,53 @@ public class FetcherTest {
     }
 
     @Test
+    public void testListOffsetNoUpdateMissingEpoch() {
+        buildFetcher();
+
+        // Set up metadata with no leader epoch
+        subscriptions.assignFromUser(singleton(tp0));
+        MetadataResponse metadataWithNoLeaderEpochs = TestUtils.metadataUpdateWith(
+                "kafka-cluster", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp
-> null);
+        client.updateMetadata(metadataWithNoLeaderEpochs);
+
+        // Return a ListOffsets response with leaderEpoch=1, we should ignore it
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 1));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        // Reset should be satisfied and no metadata update requested
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(metadata.updateRequested());
+        assertFalse(metadata.lastSeenLeaderEpoch(tp0).isPresent());
+    }
+
+    @Test
+    public void testListOffsetUpdateEpoch() {
+        buildFetcher();
+
+        // Set up metadata with leaderEpoch=1
+        subscriptions.assignFromUser(singleton(tp0));
+        MetadataResponse metadataWithLeaderEpochs = TestUtils.metadataUpdateWith(
+                "kafka-cluster", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp
-> 1);
+        client.updateMetadata(metadataWithLeaderEpochs);
+
+        // Reset offsets to trigger ListOffsets call
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // Now we see a ListOffsets with leaderEpoch=2 epoch, we trigger a metadata update
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
1),
+                listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 2));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(metadata.updateRequested());
+        assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch -> assertEquals((long)
epoch, 2));
+    }
+
+    @Test
     public void testUpdateFetchPositionDisconnect() {
         buildFetcher();
         assignFromUser(singleton(tp0));
@@ -3676,18 +3723,35 @@ public class FetcherTest {
         apiVersions.update(node.idString(), NodeApiVersions.create(
             ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
 
-        // Seek with a position and leader+epoch
-        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
-                metadata.currentLeader(tp0).leader, Optional.of(epochOne));
-        subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne),
leaderAndEpoch));
+        {
+            // Seek with a position and leader+epoch
+            Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+                    metadata.currentLeader(tp0).leader, Optional.of(epochOne));
+            subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne),
leaderAndEpoch));
 
-        // Update metadata to epoch=2, enter validation
-        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
-                Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
-        fetcher.validateOffsetsIfNeeded();
+            // Update metadata to epoch=2, enter validation
+            metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy",
1,
+                    Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
+            fetcher.validateOffsetsIfNeeded();
 
-        // Offset validation is skipped
-        assertFalse(subscriptions.awaitingValidation(tp0));
+            // Offset validation is skipped
+            assertFalse(subscriptions.awaitingValidation(tp0));
+        }
+
+        {
+            // Seek with a position and leader+epoch
+            Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+                    metadata.currentLeader(tp0).leader, Optional.of(epochOne));
+            subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne),
leaderAndEpoch));
+
+            // Update metadata to epoch=2, enter validation
+            metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy",
1,
+                    Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
+
+            // Subscription should not stay in AWAITING_VALIDATION in prepareFetchRequest
+            assertEquals(1, fetcher.sendFetches());
+            assertFalse(subscriptions.awaitingValidation(tp0));
+        }
     }
 
     @Test
@@ -3767,7 +3831,7 @@ public class FetcherTest {
                 Optional.of(epochTwo),
                 new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochTwo)));
         subscriptions.position(tp0, nextPosition);
-        subscriptions.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader,
Optional.of(epochThree)));
+        subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader,
Optional.of(epochThree)));
 
         // Prepare offset list response from async validation with epoch=2
         Map<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<>();
@@ -4044,13 +4108,27 @@ public class FetcherTest {
         };
     }
 
+    private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, final
int leaderEpoch) {
+        // matches any list offset request with the provided timestamp
+        return body -> {
+            ListOffsetRequest req = (ListOffsetRequest) body;
+            return req.partitionTimestamps().equals(Collections.singletonMap(
+                    tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.of(leaderEpoch))));
+        };
+    }
+
     private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset)
{
         return listOffsetResponse(tp0, error, timestamp, offset);
     }
 
     private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp,
long offset) {
-        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error,
timestamp, offset,
-                Optional.empty());
+        return listOffsetResponse(tp, error, timestamp, offset, null);
+    }
+
+    private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp,
long offset,
+                                                  Integer leaderEpoch) {
+        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(
+            error, timestamp, offset, Optional.ofNullable(leaderEpoch));
         Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new
HashMap<>();
         allPartitionData.put(tp, partitionData);
         return new ListOffsetResponse(allPartitionData);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 96f08f5..cc74652 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.EpochEndOffset;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
@@ -382,13 +385,15 @@ public class SubscriptionStateTest {
                 new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
+        ApiVersions apiVersions = new ApiVersions();
+        apiVersions.update(broker1.idString(), NodeApiVersions.create());
 
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+        assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
                 Optional.of(broker1), Optional.empty())));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
 
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+        assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
                 Optional.of(broker1), Optional.of(10))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
@@ -414,6 +419,9 @@ public class SubscriptionStateTest {
     @Test
     public void testSeekUnvalidatedWithOffsetEpoch() {
         Node broker1 = new Node(1, "localhost", 9092);
+        ApiVersions apiVersions = new ApiVersions();
+        apiVersions.update(broker1.idString(), NodeApiVersions.create());
+
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2),
@@ -422,19 +430,19 @@ public class SubscriptionStateTest {
         assertTrue(state.awaitingValidation(tp0));
 
         // Update using the current leader and epoch
-        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+        assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
                 Optional.of(broker1), Optional.of(5))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         // Update with a newer leader and epoch
-        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+        assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
                 Optional.of(broker1), Optional.of(15))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         // If the updated leader has no epoch information, then skip validation and begin
fetching
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+        assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
                 Optional.of(broker1), Optional.empty())));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
@@ -511,6 +519,34 @@ public class SubscriptionStateTest {
     }
 
     @Test
+    public void testMaybeValidatePositionForCurrentLeader() {
+        NodeApiVersions oldApis = NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id,
(short) 0, (short) 2);
+        ApiVersions apiVersions = new ApiVersions();
+        apiVersions.update("1", oldApis);
+
+        Node broker1 = new Node(1, "localhost", 9092);
+        state.assignFromUser(Collections.singleton(tp0));
+
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
+
+        // if API is too old to be usable, we just skip validation
+        assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(10))));
+        assertTrue(state.hasValidPosition(tp0));
+
+        // New API
+        apiVersions.update("1", NodeApiVersions.create());
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
+
+        // API is too old to be usable, we just skip validation
+        assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(10))));
+        assertFalse(state.hasValidPosition(tp0));
+    }
+
+    @Test
     public void testMaybeCompleteValidationAfterPositionChange() {
         Node broker1 = new Node(1, "localhost", 9092);
         state.assignFromUser(Collections.singleton(tp0));


Mime
View raw message