kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 0.11.0 updated: KAFKA-6857; Leader should reply with undefined offset if undefined leader epoch requested (#4967)
Date Mon, 14 May 2018 21:25:51 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 906b507  KAFKA-6857; Leader should reply with undefined offset if undefined leader
epoch requested (#4967)
906b507 is described below

commit 906b50722d486a8567c9566de9d8607958fc8de9
Author: Anna Povzner <anna@confluent.io>
AuthorDate: Thu May 3 23:06:34 2018 -0700

    KAFKA-6857; Leader should reply with undefined offset if undefined leader epoch requested
(#4967)
    
    The leader must explicitly check if requested leader epoch is undefined, and return undefined
offset so that the follower can fall back to truncating to high watermark. Otherwise, if the
leader also is not tracking leader epochs, it may return its LEO, which will the follower
to truncate to the incorrect offset.
---
 .../scala/kafka/server/epoch/LeaderEpochFileCache.scala |  9 ++++++---
 .../kafka/server/epoch/LeaderEpochFileCacheTest.scala   | 17 ++++++++++++++++-
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index ffca900..dcae5b1 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -92,10 +92,13 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () =>
LogOffsetM
   override def endOffsetFor(requestedEpoch: Int): Long = {
     inReadLock(lock) {
       val offset =
-        if (requestedEpoch == latestEpoch) {
+        if (requestedEpoch == UNDEFINED_EPOCH) {
+          // this may happen if a bootstrapping follower sends a request with undefined epoch
or
+          // a follower is on the older message format where leader epochs are not recorded
+          UNDEFINED_EPOCH_OFFSET
+        } else if (requestedEpoch == latestEpoch) {
           leo().messageOffset
-        }
-        else {
+        } else {
           val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch)
           if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
             UNDEFINED_EPOCH_OFFSET
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 8460fe4..4a8df11 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -144,6 +144,21 @@ class LeaderEpochFileCacheTest {
   }
 
   @Test
+  def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){
+    val leo = 73
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When (say a follower on older message format version) sends request for UNDEFINED_EPOCH
+    val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
+
+    //Then
+    assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor)
+  }
+
+  @Test
   def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
     def leoFinder() = new LogOffsetMetadata(0)
 
@@ -664,4 +679,4 @@ class LeaderEpochFileCacheTest {
   def setUp() {
     checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
   }
-}
\ No newline at end of file
+}

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.

Mime
View raw message