kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 3.0 updated: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher (#11221)
Date Tue, 17 Aug 2021 19:00:11 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 4d1efc9  KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition
removed from fetcher (#11221)
4d1efc9 is described below

commit 4d1efc90af02a090e6313f86577cfbb4714597a5
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Aug 17 17:12:51 2021 +0100

    KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed
from fetcher (#11221)
    
    AbstractFetcherThread#truncateOnFetchResponse is used with IBP 2.7 and above to truncate
partitions based on diverging epoch returned in fetch responses. Truncation should only be
performed for partitions that are still owned by the fetcher and this check should be done
while holding partitionMapLock to ensure that any partitions removed from the fetcher thread
are not truncated. Truncation will be performed by any new fetcher that owns the partition
when it restarts fetching.
    
    Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 38 +++++++++++--------
 .../kafka/server/AbstractFetcherThreadTest.scala   | 43 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 6118c5f..7b315be 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -229,7 +229,8 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
-  protected def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]):
Unit = {
+  // Visibility for unit tests
+  protected[server] def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]):
Unit = {
     inLock(partitionMapLock) {
       val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets,
Map.empty)
       handlePartitionsWithErrors(partitionsWithError, "truncateOnFetchResponse")
@@ -262,21 +263,28 @@ abstract class AbstractFetcherThread(name: String,
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
     fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
-      Errors.forCode(leaderEpochOffset.errorCode) match {
-        case Errors.NONE =>
-          val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset)
-          if (doTruncate(tp, offsetTruncationState))
-            fetchOffsets.put(tp, offsetTruncationState)
-
-        case Errors.FENCED_LEADER_EPOCH =>
-          val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
-            .map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava
-          if (onPartitionFenced(tp, currentLeaderEpoch))
+      if (partitionStates.contains(tp)) {
+        Errors.forCode(leaderEpochOffset.errorCode) match {
+          case Errors.NONE =>
+            val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset)
+            if (doTruncate(tp, offsetTruncationState))
+              fetchOffsets.put(tp, offsetTruncationState)
+
+          case Errors.FENCED_LEADER_EPOCH =>
+            val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
+              .map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava
+            if (onPartitionFenced(tp, currentLeaderEpoch))
+              partitionsWithError += tp
+
+          case error =>
+            info(s"Retrying leaderEpoch request for partition $tp as the leader reported
an error: $error")
             partitionsWithError += tp
-
-        case error =>
-          info(s"Retrying leaderEpoch request for partition $tp as the leader reported an
error: $error")
-          partitionsWithError += tp
+        }
+      } else {
+        // Partitions may have been removed from the fetcher while the thread was waiting
for fetch
+        // response. Removed partitions are filtered out while holding `partitionMapLock`
to ensure that we
+        // don't update state for any partition that may have already been migrated to another
thread.
+        trace(s"Ignoring epoch offsets for partition $tp since it has been removed from this
fetcher thread.")
       }
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 394897f..738c1ea 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.utils.Time
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Assumptions.assumeTrue
 import org.junit.jupiter.api.{BeforeEach, Test}
 
 import scala.jdk.CollectionConverters._
@@ -437,6 +438,48 @@ class AbstractFetcherThreadTest {
   }
 
   @Test
+  def testTruncationOnFetchSkippedIfPartitionRemoved(): Unit = {
+    assumeTrue(truncateOnFetch)
+    val partition = new TopicPartition("topic", 0)
+    var truncations = 0
+    val fetcher = new MockFetcherThread {
+      override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState):
Unit = {
+        truncations += 1
+        super.truncate(topicPartition, truncationState)
+      }
+    }
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark
= 2L)
+    fetcher.setReplicaState(partition, replicaState)
+
+    // Verify that truncation based on fetch response is performed if partition is owned
by fetcher thread
+    fetcher.addPartitions(Map(partition -> initialFetchState(6L, leaderEpoch = 4)))
+    val endOffset = new EpochEndOffset()
+      .setPartition(partition.partition)
+      .setErrorCode(Errors.NONE.code)
+      .setLeaderEpoch(4)
+      .setEndOffset(3L)
+    fetcher.truncateOnFetchResponse(Map(partition -> endOffset))
+    assertEquals(1, truncations)
+
+    // Verify that truncation based on fetch response is not performed if partition is removed
from fetcher thread
+    val offsets = fetcher.removePartitions(Set(partition))
+    assertEquals(Set(partition), offsets.keySet)
+    assertEquals(3L, offsets(partition).fetchOffset)
+    val newEndOffset = new EpochEndOffset()
+      .setPartition(partition.partition)
+      .setErrorCode(Errors.NONE.code)
+      .setLeaderEpoch(4)
+      .setEndOffset(2L)
+    fetcher.truncateOnFetchResponse(Map(partition -> newEndOffset))
+    assertEquals(1, truncations)
+  }
+
+  @Test
   def testFollowerFetchOutOfRangeHigh(): Unit = {
     val partition = new TopicPartition("topic", 0)
     val fetcher = new MockFetcherThread()

Mime
View raw message