kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734) (#10760)
Date Fri, 25 Jun 2021 12:31:04 GMT
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd72ef1  KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734) (#10760)
bd72ef1 is described below

commit bd72ef1bf1e40feb3bc17349a385b479fa5fa530
Author: thomaskwscott <thomaskwscott@gmail.com>
AuthorDate: Fri Jun 25 13:29:12 2021 +0100

    KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734) (#10760)
    
    This patch implements KIP-734 as described in https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp.
    
    Reviewers: David Jacot <djacot@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  51 ++++++++-
 .../org/apache/kafka/clients/admin/OffsetSpec.java |  10 ++
 .../kafka/clients/consumer/internals/Fetcher.java  |   2 +-
 .../kafka/common/requests/ListOffsetsRequest.java  |   7 +-
 .../common/message/ListOffsetsRequest.json         |   4 +-
 .../common/message/ListOffsetsResponse.json        |   4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 122 ++++++++++++++++++++-
 .../common/requests/ListOffsetsRequestTest.java    |   4 +-
 .../kafka/common/requests/RequestResponseTest.java |   6 +-
 core/src/main/scala/kafka/api/ApiVersion.scala     |  11 +-
 core/src/main/scala/kafka/log/Log.scala            |  10 ++
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   3 +-
 .../kafka/admin/ListOffsetsIntegrationTest.scala   |  95 ++++++++++++++++
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  29 +++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   4 +-
 .../unit/kafka/server/ListOffsetsRequestTest.scala |  33 +++++-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |  82 +++++++++++---
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   5 +-
 .../jmh/common/ListOffsetRequestBenchmark.java     |   2 +-
 21 files changed, 441 insertions(+), 47 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6477bd0..7a34541 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4209,11 +4209,7 @@ public class KafkaAdminClient extends AdminClient {
             OffsetSpec offsetSpec = entry.getValue();
             TopicPartition tp = entry.getKey();
             KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
-            long offsetQuery = (offsetSpec instanceof TimestampSpec)
-                    ? ((TimestampSpec) offsetSpec).timestamp()
-                    : (offsetSpec instanceof OffsetSpec.EarliestSpec)
-                        ? ListOffsetsRequest.EARLIEST_TIMESTAMP
-                        : ListOffsetsRequest.LATEST_TIMESTAMP;
+            long offsetQuery = getOffsetFromOffsetSpec(offsetSpec);
             // avoid sending listOffsets request for topics with errors
             if (!mr.errors().containsKey(tp.topic())) {
                 Node node = mr.cluster().leaderFor(tp);
@@ -4236,10 +4232,12 @@ public class KafkaAdminClient extends AdminClient {
 
                 final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
 
+                private boolean supportsMaxTimestamp = true;
+
                 @Override
                 ListOffsetsRequest.Builder createRequest(int timeoutMs) {
                     return ListOffsetsRequest.Builder
-                            .forConsumer(true, context.options().isolationLevel())
+                            .forConsumer(true, context.options().isolationLevel(), supportsMaxTimestamp)
                             .setTargetTimes(partitionsToQuery);
                 }
 
@@ -4298,6 +4296,36 @@ public class KafkaAdminClient extends AdminClient {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures and remove partitions from the downgraded retry
+                        boolean foundMaxTimestampPartition = false;
+                        Iterator<ListOffsetsTopic> topicIterator = partitionsToQuery.iterator();
+                        while (topicIterator.hasNext()) {
+                            ListOffsetsTopic topic = topicIterator.next();
+                            Iterator<ListOffsetsPartition> partitionIterator = topic.partitions().iterator();
+                            while (partitionIterator.hasNext()) {
+                                ListOffsetsPartition partition = partitionIterator.next();
+                                if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) {
+                                    foundMaxTimestampPartition = true;
+                                    futures.get(new TopicPartition(topic.name(), partition.partitionIndex()))
+                                        .completeExceptionally(new UnsupportedVersionException(
+                                            "Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec"));
+                                    partitionIterator.remove();
+                                }
+                            }
+                            if (topic.partitions().isEmpty()) {
+                                topicIterator.remove();
+                            }
+                        }
+                        return foundMaxTimestampPartition && !partitionsToQuery.isEmpty();
+                    }
+                    return false;
+                }
             });
         }
         return calls;
@@ -4834,6 +4862,17 @@ public class KafkaAdminClient extends AdminClient {
         };
     }
 
+    private long getOffsetFromOffsetSpec(OffsetSpec offsetSpec) {
+        if (offsetSpec instanceof TimestampSpec) {
+            return ((TimestampSpec) offsetSpec).timestamp();
+        } else if (offsetSpec instanceof OffsetSpec.EarliestSpec) {
+            return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+        } else if (offsetSpec instanceof OffsetSpec.MaxTimestampSpec) {
+            return ListOffsetsRequest.MAX_TIMESTAMP;
+        }
+        return ListOffsetsRequest.LATEST_TIMESTAMP;
+    }
+
     /**
      * Get a sub level error when the request is in batch. If given key was not found,
      * return an {@link IllegalArgumentException}.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index 339e9cf..dcf9045 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -25,6 +25,7 @@ public class OffsetSpec {
 
     public static class EarliestSpec extends OffsetSpec { }
     public static class LatestSpec extends OffsetSpec { }
+    public static class MaxTimestampSpec extends OffsetSpec { }
     public static class TimestampSpec extends OffsetSpec {
         private final long timestamp;
 
@@ -60,4 +61,13 @@ public class OffsetSpec {
         return new TimestampSpec(timestamp);
     }
 
+    /**
+     * Used to retrieve the offset with the largest timestamp of a partition
+     * as message timestamps can be specified client side this may not match
+     * the log end offset returned by LatestSpec
+     */
+    public static OffsetSpec maxTimestamp() {
+        return new MaxTimestampSpec();
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index adfaae9..3f7d31b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -978,7 +978,7 @@ public class Fetcher<K, V> implements Closeable {
                                                                   final Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
                                                                   boolean requireTimestamp) {
         ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
-                .forConsumer(requireTimestamp, isolationLevel)
+                .forConsumer(requireTimestamp, isolationLevel, false)
                 .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
 
         log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
index 658188d..6b7734a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.Errors;
 public class ListOffsetsRequest extends AbstractRequest {
     public static final long EARLIEST_TIMESTAMP = -2L;
     public static final long LATEST_TIMESTAMP = -1L;
+    public static final long MAX_TIMESTAMP = -3L;
 
     public static final int CONSUMER_REPLICA_ID = -1;
     public static final int DEBUGGING_REPLICA_ID = -2;
@@ -54,9 +55,11 @@ public class ListOffsetsRequest extends AbstractRequest {
             return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
         }
 
-        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
+        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) {
             short minVersion = 0;
-            if (isolationLevel == IsolationLevel.READ_COMMITTED)
+            if (requireMaxTimestamp)
+                minVersion = 7;
+            else if (isolationLevel == IsolationLevel.READ_COMMITTED)
                 minVersion = 2;
             else if (requireTimestamp)
                 minVersion = 1;
diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json
index a464c93..93c920e 100644
--- a/clients/src/main/resources/common/message/ListOffsetsRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json
@@ -30,7 +30,9 @@
   // Version 5 is the same as version 4.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 enables listing offsets by max timestamp (KIP-734).
+  "validVersions": "0-7",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json
index 727bb8f..6d6be0f 100644
--- a/clients/src/main/resources/common/message/ListOffsetsResponse.json
+++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json
@@ -29,7 +29,9 @@
   // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 is the same as version 6 (KIP-734).
+  "validVersions": "0-7",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0013d33..4f6cddb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -172,6 +172,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.ListTransactionsRequest;
@@ -4032,6 +4033,7 @@ public class KafkaAdminClientTest {
         pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0}));
         pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new Node[]{node0}));
         pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        pInfos.add(new PartitionInfo("qux", 0, node0, new Node[]{node0}, new Node[]{node0}));
         final Cluster cluster =
             new Cluster(
                 "mockClusterId",
@@ -4044,6 +4046,7 @@ public class KafkaAdminClientTest {
         final TopicPartition tp0 = new TopicPartition("foo", 0);
         final TopicPartition tp1 = new TopicPartition("bar", 0);
         final TopicPartition tp2 = new TopicPartition("baz", 0);
+        final TopicPartition tp3 = new TopicPartition("qux", 0);
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@@ -4053,15 +4056,17 @@ public class KafkaAdminClientTest {
             ListOffsetsTopicResponse t0 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321);
             ListOffsetsTopicResponse t1 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 234L, 432);
             ListOffsetsTopicResponse t2 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp2, Errors.NONE, 123456789L, 345L, 543);
+            ListOffsetsTopicResponse t3 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp3, Errors.NONE, 234567890L, 456L, 654);
             ListOffsetsResponseData responseData = new ListOffsetsResponseData()
                     .setThrottleTimeMs(0)
-                    .setTopics(Arrays.asList(t0, t1, t2));
+                    .setTopics(Arrays.asList(t0, t1, t2, t3));
             env.kafkaClient().prepareResponse(new ListOffsetsResponse(responseData));
 
             Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
             partitions.put(tp0, OffsetSpec.latest());
             partitions.put(tp1, OffsetSpec.earliest());
             partitions.put(tp2, OffsetSpec.forTimestamp(System.currentTimeMillis()));
+            partitions.put(tp3, OffsetSpec.maxTimestamp());
             ListOffsetsResult result = env.adminClient().listOffsets(partitions);
 
             Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
@@ -4075,9 +4080,13 @@ public class KafkaAdminClientTest {
             assertEquals(345L, offsets.get(tp2).offset());
             assertEquals(543, offsets.get(tp2).leaderEpoch().get().intValue());
             assertEquals(123456789L, offsets.get(tp2).timestamp());
+            assertEquals(456L, offsets.get(tp3).offset());
+            assertEquals(654, offsets.get(tp3).leaderEpoch().get().intValue());
+            assertEquals(234567890L, offsets.get(tp3).timestamp());
             assertEquals(offsets.get(tp0), result.partitionResult(tp0).get());
             assertEquals(offsets.get(tp1), result.partitionResult(tp1).get());
             assertEquals(offsets.get(tp2), result.partitionResult(tp2).get());
+            assertEquals(offsets.get(tp3), result.partitionResult(tp3).get());
             try {
                 result.partitionResult(new TopicPartition("unknown", 0)).get();
                 fail("should have thrown IllegalArgumentException");
@@ -4226,6 +4235,117 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(
+                // ensure that no max timestamp requests are retried
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP),
+                new ListOffsetsResponse(responseData), node);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class);
+
+            ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsUnsupportedNonMaxTimestamp() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(
+                Collections.singletonMap(tp0, OffsetSpec.latest()));
+
+            TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class);
+        }
+    }
+
     private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
         return Utils.mkMap(
             Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)),
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
index a7c83d6..83c4b10 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
@@ -67,7 +67,7 @@ public class ListOffsetsRequestTest {
                                 new ListOffsetsPartition()
                                     .setPartitionIndex(0))));
             ListOffsetsRequest request = ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
                     .setTargetTimes(topics)
                     .build(version);
             ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
@@ -100,7 +100,7 @@ public class ListOffsetsRequestTest {
                             new ListOffsetsPartition()
                                 .setPartitionIndex(0))));
         ListOffsetsRequest request = ListOffsetsRequest.Builder
-                .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+                .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
                 .setTargetTimes(topics)
                 .build((short) 0);
         ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index fc709dc..0208c05 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1454,7 +1454,7 @@ public class RequestResponseTest {
                             .setMaxNumOffsets(10)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
         } else if (version == 1) {
@@ -1465,7 +1465,7 @@ public class RequestResponseTest {
                             .setTimestamp(1000000L)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
         } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
@@ -1478,7 +1478,7 @@ public class RequestResponseTest {
                     .setName("test")
                     .setPartitions(Arrays.asList(partition));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
         } else {
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 724bc98..d73b0a2 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -114,7 +114,9 @@ object ApiVersion {
     // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516)
     KAFKA_2_8_IV1,
     // Introduce AllocateProducerIds (KIP-730)
-    KAFKA_3_0_IV0
+    KAFKA_3_0_IV0,
+    // Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)
+    KAFKA_3_0_IV1
   )
 
   // Map keys are the union of the short and full versions
@@ -458,6 +460,13 @@ case object KAFKA_3_0_IV0 extends DefaultApiVersion {
   val id: Int = 33
 }
 
+case object KAFKA_3_0_IV1 extends DefaultApiVersion {
+  val shortVersion: String = "3.0"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V2
+  val id: Int = 34
+}
+
 object ApiVersionValidator extends Validator {
 
   override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 35ab50c..bccd2a3 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1338,6 +1338,16 @@ class Log(@volatile private var _dir: File,
         val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
+      } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+        // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+        // constant time access while being safe to use with concurrent collections unlike `toArray`.
+        val segmentsCopy = logSegments.toBuffer
+        val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+        val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+          latestTimestampSegment.offsetOfMaxTimestampSoFar,
+          epochOptional))
       } else {
         // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
         // constant time access while being safe to use with concurrent collections unlike `toArray`.
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 170b5b9..840c567 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -94,7 +94,8 @@ class ReplicaFetcherThread(name: String,
 
   // Visible for testing
   private[server] val listOffsetRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV1) 7
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_2_IV1) 5
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3
diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
new file mode 100644
index 0000000..c937030
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName, 1, 1.toShort)
+    produceMessages()
+    adminClient = Admin.create(Map[String, Object](
+      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+    ).asJava)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+    super.tearDown()
+  }
+
+  @Test
+  def testEarliestOffset(): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+    assertEquals(0, earliestOffset.offset())
+  }
+
+  @Test
+  def testLatestOffset(): Unit = {
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+    assertEquals(3, latestOffset.offset())
+  }
+
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+    val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
+    assertEquals(1, maxTimestampOffset.offset())
+  }
+
+  private def runFetchOffsets(adminClient: Admin,
+                              offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
+    val tp = new TopicPartition(topicName, 0)
+    adminClient.listOffsets(Map(
+      tp -> offsetSpec
+    ).asJava, new ListOffsetsOptions()).all().get().get(tp)
+  }
+
+  def produceMessages(): Unit = {
+    val records = Seq(
+      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
+        null, new Array[Byte](10000)),
+      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
+        null, new Array[Byte](10000)),
+      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
+        null, new Array[Byte](10000)),
+    )
+    TestUtils.produceMessages(servers, records, -1)
+  }
+
+  def generateConfigs: Seq[KafkaConfig] =
+    TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+}
+
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c730c47..0204b89 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -354,7 +354,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createListOffsetsRequest = {
-    requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED).setTargetTimes(
+    requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes(
       List(new ListOffsetsTopic()
         .setName(tp.topic)
         .setPartitions(List(new ListOffsetsPartition()
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4a0674d..f515f13 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2059,6 +2059,35 @@ class LogTest {
       log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
   }
 
+  @Test
+  def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+    val firstTimestamp = mockTime.milliseconds
+    val leaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = secondTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
+  }
+
   /**
    * Test the Log truncate operations
    */
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 399a7bd..f61688f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2080,7 +2080,7 @@ class KafkaApisTest {
         .setPartitionIndex(tp.partition)
         .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)
         .setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
-    val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel)
+    val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
       .setTargetTimes(targetTimes).build()
     val request = buildRequest(listOffsetRequest)
     val capturedResponse = expectNoThrottling(request)
@@ -3192,7 +3192,7 @@ class KafkaApisTest {
       .setPartitions(List(new ListOffsetsPartition()
         .setPartitionIndex(tp.partition)
         .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
-    val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel)
+    val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
       .setTargetTimes(targetTimes).build()
     val request = buildRequest(listOffsetRequest)
     val capturedResponse = expectNoThrottling(request)
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 6bbf0a0..1988ad6 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -43,7 +43,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
         .setCurrentLeaderEpoch(0)).asJava)).asJava
 
     val consumerRequest = ListOffsetsRequest.Builder
-      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
       .setTargetTimes(targetTimes)
       .build()
 
@@ -80,6 +80,18 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, debugReplicaRequest)
   }
 
+  @Test
+  def testListOffsetsMaxTimeStampOldestVersion(): Unit = {
+    val consumerRequestBuilder = ListOffsetsRequest.Builder
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+
+    val maxTimestampRequestBuilder = ListOffsetsRequest.Builder
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true)
+
+    assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion())
+    assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion())
+  }
+
   def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
     val listOffsetPartition = new ListOffsetsPartition()
       .setPartitionIndex(partition.partition)
@@ -90,7 +102,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
       .setName(topic)
       .setPartitions(List(listOffsetPartition).asJava)).asJava
     val request = ListOffsetsRequest.Builder
-      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
       .setTargetTimes(targetTimes)
       .build()
     assertResponseError(error, brokerId, request)
@@ -133,7 +145,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
         .setTimestamp(timestamp)).asJava)).asJava
 
     val builder = ListOffsetsRequest.Builder
-      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
       .setTargetTimes(targetTimes)
 
     val request = if (version == -1) builder.build() else builder.build(version)
@@ -162,11 +174,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
-    TestUtils.generateAndProduceMessages(servers, topic, 10)
+    TestUtils.generateAndProduceMessages(servers, topic, 9)
+    TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L)
 
     assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1))
     assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1))
     assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
+    assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
 
     // Kill the first leader so that we can verify the epoch change when fetching the latest offset
     killBroker(firstLeaderId)
@@ -185,6 +199,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
     // The latest offset reflects the updated epoch
     assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
+    assertEquals((9L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
   }
 
   @Test
@@ -192,7 +207,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
-    TestUtils.generateAndProduceMessages(servers, topic, 10)
+    TestUtils.generateAndProduceMessages(servers, topic, 9)
+    TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L)
 
     for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) {
       if (version == 0) {
@@ -203,10 +219,15 @@ class ListOffsetsRequestTest extends BaseRequestTest {
         assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
         assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
         assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
-      } else if (version >= 4) {
+      } else if (version >= 4  && version <= 6) {
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+      } else if (version >= 7) {
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
         assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+        assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index bf9ad3e..b742bfe 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -54,7 +54,7 @@ class LogOffsetTest extends BaseRequestTest {
   @Test
   def testGetOffsetsForUnknownTopic(): Unit = {
     val topicPartition = new TopicPartition("foo", 0)
-    val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+    val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
       .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
     val response = sendListOffsetsRequest(request)
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode)
@@ -65,13 +65,7 @@ class LogOffsetTest extends BaseRequestTest {
   def testGetOffsetsAfterDeleteRecords(): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
-
-    createTopic(topic, 1, 1)
-
-    val logManager = server.getLogManager
-    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
-                  "Log for partition [topic,0] should be created")
-    val log = logManager.getLog(topicPartition).get
+    val log = createTopicAndGetLog(topic, topicPartition)
 
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -93,16 +87,51 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @Test
-  def testGetOffsetsBeforeLatestTime(): Unit = {
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
 
-    createTopic(topic, 1, 1)
+    for (timestamp <- 0 until 20)
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+    log.flush()
 
-    val logManager = server.getLogManager
-    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
-      s"Log for partition $topicPartition should be created")
-    val log = logManager.getLog(topicPartition).get
+    log.updateHighWatermark(log.logEndOffset)
+
+    val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(19L, firstOffset.get.offset)
+    assertEquals(19L, firstOffset.get.timestamp)
+
+    log.truncateTo(0)
+
+    val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, secondOffset.get.offset)
+    assertEquals(-1L, secondOffset.get.timestamp)
+  }
+
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
+
+    for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L))
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(7L, log.logEndOffset)
+    assertEquals(5L, maxTimestampOffset.get.offset)
+    assertEquals(6L, maxTimestampOffset.get.timestamp)
+  }
+
+  @Test
+  def testGetOffsetsBeforeLatestTime(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
 
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -111,7 +140,7 @@ class LogOffsetTest extends BaseRequestTest {
     val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 
-    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, 0, server),
       "Leader should be elected")
     val request = ListOffsetsRequest.Builder.forReplica(0, 0)
       .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 15).asJava).build()
@@ -149,6 +178,20 @@ class LogOffsetTest extends BaseRequestTest {
     assertFalse(offsetChanged)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, log.logEndOffset)
+    assertEquals(0L, maxTimestampOffset.get.offset)
+    assertEquals(-1L, maxTimestampOffset.get.timestamp)
+  }
+
   @deprecated("legacyFetchOffsetsBefore", since = "")
   @Test
   def testGetOffsetsBeforeNow(): Unit = {
@@ -266,4 +309,13 @@ class LogOffsetTest extends BaseRequestTest {
       .partitions.asScala.find(_.partitionIndex == tp.partition).get
   }
 
+  private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): Log = {
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    logManager.getLog(topicPartition).get
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 757b82c..1f04f08 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -240,7 +240,7 @@ class RequestQuotaTest extends BaseRequestTest {
               .setPartitionIndex(tp.partition)
               .setTimestamp(0L)
               .setCurrentLeaderEpoch(15)).asJava)
-          ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+          ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
             .setTargetTimes(List(topic).asJava)
 
         case ApiKeys.LEADER_AND_ISR =>
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 53bc88e..0802d87 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1207,17 +1207,18 @@ object TestUtils extends Logging {
     values
   }
 
-  def produceMessage(servers: Seq[KafkaServer], topic: String, message: String,
+  def produceMessage(servers: Seq[KafkaServer], topic: String, message: String, timestamp: java.lang.Long = null,
                      deliveryTimeoutMs: Int = 30 * 1000, requestTimeoutMs: Int = 20 * 1000): Unit = {
     val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers),
       deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs = requestTimeoutMs)
     try {
-      producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
+      producer.send(new ProducerRecord(topic, null, timestamp, topic.getBytes, message.getBytes)).get
     } finally {
       producer.close()
     }
   }
 
+
   def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = {
     val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
     // wait until admin path for delete topic is deleted, signaling completion of topic deletion
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
index 326916e..e6fc2dc 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
@@ -69,7 +69,7 @@ public class ListOffsetRequestBenchmark {
             }
         }
 
-        this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+        this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
                 .build(ApiKeys.LIST_OFFSETS.latestVersion());
     }
 

Mime
View raw message