kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9491; Increment high watermark after full log truncation (#8037)
Date Tue, 04 Feb 2020 19:36:45 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new a692a18  KAFKA-9491; Increment high watermark after full log truncation (#8037)
a692a18 is described below

commit a692a18e6b830846ec9b151488842cde0a50e45e
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Feb 4 11:22:33 2020 -0800

    KAFKA-9491; Increment high watermark after full log truncation (#8037)
    
    When a follower's fetch offset is behind the leader's log start offset, the
    follower will do a full log truncation. When it does so, it must update both
    its log start offset and high watermark. The previous code did the former,
    but not the latter. Failure to update the high watermark in this case can lead
    to out of range errors if the follower becomes leader before getting the latest
    high watermark from the previous leader. The out of range errors occur when
    we attempt to resolve the log position of the high watermark in DelayedFetch
    in order to determine if a fetch is satisfied.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Chia-Ping Tsai <chia7712@gmail.com>,
Ismael Juma <ismael@juma.me.uk>
---
 core/src/main/scala/kafka/cluster/Partition.scala |  8 +++---
 core/src/main/scala/kafka/log/Log.scala           | 33 ++++++++++++++++-------
 core/src/test/scala/unit/kafka/log/LogTest.scala  | 20 +++++++++++---
 3 files changed, 43 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index a8796eb..7ca7965 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -506,8 +506,9 @@ class Partition(val topicPartition: TopicPartition,
 
       val leaderLog = localLogOrException
       val leaderEpochStartOffset = leaderLog.logEndOffset
-      info(s"$topicPartition starts at Leader Epoch ${partitionState.leaderEpoch} from "
+
-        s"offset $leaderEpochStartOffset. Previous Leader Epoch was: $leaderEpoch")
+      info(s"$topicPartition starts at leader epoch ${partitionState.leaderEpoch} from "
+
+        s"offset $leaderEpochStartOffset with high watermark ${leaderLog.highWatermark}.
" +
+        s"Previous leader epoch was $leaderEpoch.")
 
       //We cache the leader epoch here, persisting it only if it's local (hence having a
log dir)
       leaderEpoch = partitionState.leaderEpoch
@@ -522,12 +523,11 @@ class Partition(val topicPartition: TopicPartition,
       leaderLog.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
 
       val isNewLeader = !isLeader
-      val curLeaderLogEndOffset = leaderLog.logEndOffset
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
       remoteReplicas.foreach { replica =>
         val lastCaughtUpTimeMs = if (inSyncReplicaIds.contains(replica.brokerId)) curTimeMs
else 0L
-        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
+        replica.resetLastCaughtUpTime(leaderEpochStartOffset, curTimeMs, lastCaughtUpTimeMs)
       }
 
       if (isNewLeader) {
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c2cd2e2..634f216 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -302,7 +302,7 @@ class Log(@volatile var dir: File,
 
     leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
 
-    logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
+    updateLogStartOffset(math.max(logStartOffset, segments.firstEntry.getValue.baseOffset))
 
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
     leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
@@ -741,14 +741,30 @@ class Log(@volatile var dir: File,
     }
   }
 
-  private def updateLogEndOffset(messageOffset: Long): Unit = {
-    nextOffsetMetadata = LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
+  private def updateLogEndOffset(offset: Long): Unit = {
+    nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
 
     // Update the high watermark in case it has gotten ahead of the log end offset following
a truncation
     // or if a new segment has been rolled and the offset metadata needs to be updated.
-    if (highWatermark >= messageOffset) {
+    if (highWatermark >= offset) {
       updateHighWatermarkMetadata(nextOffsetMetadata)
     }
+
+    if (this.recoveryPoint > offset) {
+      this.recoveryPoint = offset
+    }
+  }
+
+  private def updateLogStartOffset(offset: Long): Unit = {
+    logStartOffset = offset
+
+    if (highWatermark < offset) {
+      updateHighWatermark(offset)
+    }
+
+    if (this.recoveryPoint < offset) {
+      this.recoveryPoint = offset
+    }
   }
 
   /**
@@ -1262,7 +1278,7 @@ class Log(@volatile var dir: File,
         checkIfMemoryMappedBufferClosed()
         if (newLogStartOffset > logStartOffset) {
           info(s"Incrementing log start offset to $newLogStartOffset")
-          logStartOffset = newLogStartOffset
+          updateLogStartOffset(newLogStartOffset)
           leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
           producerStateManager.truncateHead(newLogStartOffset)
           maybeIncrementFirstUnstableOffset()
@@ -2066,8 +2082,7 @@ class Log(@volatile var dir: File,
             removeAndDeleteSegments(deletable, asyncDelete = true)
             activeSegment.truncateTo(targetOffset)
             updateLogEndOffset(targetOffset)
-            this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
-            this.logStartOffset = math.min(targetOffset, this.logStartOffset)
+            updateLogStartOffset(math.min(targetOffset, this.logStartOffset))
             leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
             loadProducerState(targetOffset, reloadFromCleanShutdown = false)
           }
@@ -2101,9 +2116,7 @@ class Log(@volatile var dir: File,
         producerStateManager.truncate()
         producerStateManager.updateMapEndOffset(newOffset)
         maybeIncrementFirstUnstableOffset()
-
-        this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
-        this.logStartOffset = newOffset
+        updateLogStartOffset(newOffset)
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8eaf47c..1e2e047 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -117,12 +117,13 @@ class LogTest {
   def testHighWatermarkMaintenance(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
+    val leaderEpoch = 0
 
-    val records = TestUtils.records(List(
+    def records(offset: Long): MemoryRecords = TestUtils.records(List(
       new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
       new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
       new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
-    ))
+    ), baseOffset = offset, partitionLeaderEpoch= leaderEpoch)
 
     def assertHighWatermark(offset: Long): Unit = {
       assertEquals(offset, log.highWatermark)
@@ -133,7 +134,7 @@ class LogTest {
     assertHighWatermark(0L)
 
     // High watermark not changed by append
-    log.appendAsLeader(records, leaderEpoch = 0)
+    log.appendAsLeader(records(0), leaderEpoch)
     assertHighWatermark(0L)
 
     // Update high watermark as leader
@@ -145,13 +146,24 @@ class LogTest {
     assertHighWatermark(3L)
 
     // Update high watermark as follower
-    log.appendAsLeader(records, leaderEpoch = 0)
+    log.appendAsFollower(records(3L))
     log.updateHighWatermark(6L)
     assertHighWatermark(6L)
 
     // High watermark should be adjusted by truncation
     log.truncateTo(3L)
     assertHighWatermark(3L)
+
+    log.appendAsLeader(records(0L), leaderEpoch = 0)
+    assertHighWatermark(3L)
+    assertEquals(6L, log.logEndOffset)
+    assertEquals(0L, log.logStartOffset)
+
+    // Full truncation should also reset high watermark
+    log.truncateFullyAndStartAt(4L)
+    assertEquals(4L, log.logEndOffset)
+    assertEquals(4L, log.logStartOffset)
+    assertHighWatermark(4L)
   }
 
   private def assertNonEmptyFetch(log: Log, offset: Long, isolation: FetchIsolation): Unit
= {


Mime
View raw message