kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.7 updated: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs (#10930)
Date Tue, 29 Jun 2021 17:48:48 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 51a1aec  KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset
even for diverging epochs (#10930)
51a1aec is described below

commit 51a1aecc0914f116fbef151c2e16db5d517a980b
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Jun 29 16:49:36 2021 +0100

    KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging
epochs (#10930)
    
    If fetchOffset < startOffset, we currently throw OffsetOutOfRangeException when attempting
to read from the log in the regular case. But for diverging epochs, we return Errors.NONE
with the new leader start offset, hwm etc.. ReplicaFetcherThread throws OffsetOutOfRangeException
when processing responses with Errors.NONE if the leader's offsets in the response are out
of range and this moves the partition to failed state. The PR adds a check for this case when
processing fetch requests [...]
    
    Reviewers: Luke Chen <showuon@gmail.com>, Nikhil Bhatia <rite2nikhil@gmail.com>,
Guozhang Wang <wangguoz@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala          | 6 ++++++
 core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 9 +++++++++
 2 files changed, 15 insertions(+)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 586afbf..295be26 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1118,6 +1118,12 @@ class Partition(val topicPartition: TopicPartition,
           s"$lastFetchedEpoch from the request")
       }
 
+      // If fetch offset is less than log start, fail with OffsetOutOfRangeException, regardless
of whether epochs are diverging
+      if (fetchOffset < initialLogStartOffset) {
+        throw new OffsetOutOfRangeException(s"Received request for offset $fetchOffset for
partition $topicPartition, " +
+          s"but we only have log segments in the range $initialLogStartOffset to $initialLogEndOffset.")
+      }
+
       if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset)
{
         val emptyFetchData = FetchDataInfo(
           fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index d7aa33b..9cf9fc7 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -127,6 +127,15 @@ class PartitionTest extends AbstractPartitionTest {
     assertThrows[OffsetOutOfRangeException] {
       read(lastFetchedEpoch = 0, fetchOffset = 0)
     }
+
+    // Fetch offset lower than start offset should throw OffsetOutOfRangeException
+    log.maybeIncrementLogStartOffset(newLogStartOffset = 10, ClientRecordDeletion)
+    assertThrows[OffsetOutOfRangeException] {
+      read(lastFetchedEpoch = 5, fetchOffset = 6) // diverging
+    }
+    assertThrows[OffsetOutOfRangeException] {
+      read(lastFetchedEpoch = 3, fetchOffset = 6) // not diverging
+    }
   }
 
   @Test

Mime
View raw message