kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-6266: Repeated occurrence of WARN Resetting first dirty offset … (#8136)
Date Fri, 21 Feb 2020 02:15:04 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new ce4aa9c  KAFKA-6266: Repeated occurrence of WARN Resetting first dirty offset …
(#8136)
ce4aa9c is described below

commit ce4aa9c0943436f77393609b8db986d5ffb7a8b6
Author: David Mao <47232755+splett2@users.noreply.github.com>
AuthorDate: Thu Feb 20 18:14:26 2020 -0800

    KAFKA-6266: Repeated occurrence of WARN Resetting first dirty offset … (#8136)
    
    Previously, checkpointed offsets for a log were only updated if the log was chosen for
cleaning once the cleaning job completes. This caused issues in cases where logs with invalid
checkpointed offsets would repeatedly emit warnings if the log with an invalid cleaning checkpoint
wasn't chosen for cleaning.
    
    Proposed fix is to update the checkpointed offset for logs with invalid checkpoints regardless
of whether it gets chosen for cleaning.
    
    Reviewers: Anna Povzner <anna@confluent.io>, Jun Rao <junrao@gmail.com>
---
 .../main/scala/kafka/log/LogCleanerManager.scala   | 83 +++++++++++++--------
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 87 +++++++++++++++++++---
 2 files changed, 125 insertions(+), 45 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 32ad708..19f7e4d 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -97,32 +97,32 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /* gauges for tracking the number of uncleanable bytes from uncleanable partitions for
each log directory */
-    for (dir <- logDirs) {
-      newGauge(
-        "uncleanable-bytes",
-        new Gauge[Long] {
-          def value = {
-            inLock(lock) {
-              uncleanablePartitions.get(dir.getAbsolutePath) match {
-                case Some(partitions) => {
-                  val lastClean = allCleanerCheckpoints
-                  val now = Time.SYSTEM.milliseconds
-                  partitions.map { tp =>
-                    val log = logs.get(tp)
-                    val lastCleanOffset = lastClean.get(tp)
-                    val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log,
lastCleanOffset, now)
-                    val (_, uncleanableBytes) = calculateCleanableBytes(log, firstDirtyOffset,
firstUncleanableDirtyOffset)
-                    uncleanableBytes
-                  }.sum
-                }
-                case _ => 0
+  for (dir <- logDirs) {
+    newGauge(
+      "uncleanable-bytes",
+      new Gauge[Long] {
+        def value = {
+          inLock(lock) {
+            uncleanablePartitions.get(dir.getAbsolutePath) match {
+              case Some(partitions) => {
+                val lastClean = allCleanerCheckpoints
+                val now = Time.SYSTEM.milliseconds
+                partitions.map { tp =>
+                  val log = logs.get(tp)
+                  val lastCleanOffset = lastClean.get(tp)
+                  val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
+                  val (_, uncleanableBytes) = calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset,
offsetsToClean.firstUncleanableDirtyOffset)
+                  uncleanableBytes
+                }.sum
               }
+              case _ => 0
             }
           }
-        },
-        Map("logDirectory" -> dir.getAbsolutePath)
-      )
-    }
+        }
+      },
+      Map("logDirectory" -> dir.getAbsolutePath)
+    )
+  }
 
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
   @volatile private var dirtiestLogCleanableRatio = 0.0
@@ -187,11 +187,14 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         case (topicPartition, log) => // create a LogToClean instance for each
           try {
             val lastCleanOffset = lastClean.get(topicPartition)
-            val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset,
now)
-            val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
+            val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
+            // update checkpoint for logs with invalid checkpointed offsets
+            if (offsetsToClean.forceUpdateCheckpoint)
+              updateCheckpoints(log.dir.getParentFile(), Option(topicPartition, offsetsToClean.firstDirtyOffset))
+            val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset,
now)
             preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
 
-            LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset,
compactionDelayMs > 0)
+            LogToClean(topicPartition, log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset,
compactionDelayMs > 0)
           } catch {
             case e: Throwable => throw new LogCleaningException(log,
               s"Failed to calculate log cleaning stats for partition $topicPartition", e)
@@ -488,6 +491,20 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 }
 
+/**
+ * Helper class for the range of cleanable dirty offsets of a log and whether to update the
checkpoint associated with
+ * the log
+ *
+ * @param firstDirtyOffset the lower (inclusive) offset to begin cleaning from
+ * @param firstUncleanableDirtyOffset the upper(exclusive) offset to clean to
+ * @param forceUpdateCheckpoint whether to update the checkpoint associated with this log.
if true, checkpoint should be
+ *                             reset to firstDirtyOffset
+ */
+private case class OffsetsToClean(firstDirtyOffset: Long,
+                                  firstUncleanableDirtyOffset: Long,
+                                  forceUpdateCheckpoint: Boolean = false) {
+}
+
 private[log] object LogCleanerManager extends Logging {
 
   def isCompactAndDelete(log: Log): Boolean = {
@@ -523,12 +540,12 @@ private[log] object LogCleanerManager extends Logging {
     * @param log the log
     * @param lastCleanOffset the last checkpointed offset
     * @param now the current time in milliseconds of the cleaning operation
-    * @return the lower (inclusive) and upper (exclusive) offsets
+    * @return OffsetsToClean containing offsets for cleanable portion of log and whether
the log checkpoint needs updating
     */
-  def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): (Long, Long)
= {
+  def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): OffsetsToClean
= {
     // If the log segments are abnormally truncated and hence the checkpointed offset is
no longer valid;
     // reset to the log starting offset and log the error
-    val firstDirtyOffset = {
+    val (firstDirtyOffset, forceUpdateCheckpoint) = {
       val logStartOffset = log.logStartOffset
       val checkpointDirtyOffset = lastCleanOffset.getOrElse(logStartOffset)
 
@@ -537,15 +554,15 @@ private[log] object LogCleanerManager extends Logging {
         if (!isCompactAndDelete(log))
           warn(s"Resetting first dirty offset of ${log.name} to log start offset $logStartOffset
" +
             s"since the checkpointed offset $checkpointDirtyOffset is invalid.")
-        logStartOffset
+        (logStartOffset, true)
       } else if (checkpointDirtyOffset > log.logEndOffset) {
         // The dirty offset has gotten ahead of the log end offset. This could happen if
there was data
         // corruption at the end of the log. We conservatively assume that the full log needs
cleaning.
         warn(s"The last checkpoint dirty offset for partition ${log.name} is $checkpointDirtyOffset,
" +
           s"which is larger than the log end offset ${log.logEndOffset}. Resetting to the
log start offset $logStartOffset.")
-        logStartOffset
+        (logStartOffset, true)
       } else {
-        checkpointDirtyOffset
+        (checkpointDirtyOffset, false)
       }
     }
 
@@ -580,7 +597,7 @@ private[log] object LogCleanerManager extends Logging {
       s"now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset
" +
       s"activeSegment.baseOffset=${log.activeSegment.baseOffset}")
 
-    (firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset))
+    OffsetsToClean(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset),
forceUpdateCheckpoint)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index bdf5cb7..5e2620c 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -54,6 +54,11 @@ class LogCleanerManagerTest extends Logging {
     override def allCleanerCheckpoints: Map[TopicPartition, Long] = {
       cleanerCheckpoints.toMap
     }
+
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]):
Unit = {
+      val (tp, offset) = update.getOrElse(throw new IllegalArgumentException("update=None
argument not yet handled"))
+      cleanerCheckpoints.put(tp, offset)
+    }
   }
 
   @After
@@ -423,8 +428,8 @@ class LogCleanerManagerTest extends Logging {
 
     val lastCleanOffset = Some(0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
-    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
-    assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset,
cleanableOffsets._2)
+    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets.firstDirtyOffset)
+    assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset,
cleanableOffsets.firstUncleanableDirtyOffset)
   }
 
   /**
@@ -453,8 +458,8 @@ class LogCleanerManagerTest extends Logging {
 
     val lastCleanOffset = Some(0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
-    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
-    assertEquals("The first uncleanable offset begins with the second block of log entries.",
activeSegAtT0.baseOffset, cleanableOffsets._2)
+    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets.firstDirtyOffset)
+    assertEquals("The first uncleanable offset begins with the second block of log entries.",
activeSegAtT0.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset)
   }
 
   /**
@@ -478,8 +483,27 @@ class LogCleanerManagerTest extends Logging {
 
     val lastCleanOffset = Some(0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
-    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
-    assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset,
cleanableOffsets._2)
+    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets.firstDirtyOffset)
+    assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset,
cleanableOffsets.firstUncleanableDirtyOffset)
+  }
+
+  @Test
+  def testCleanableOffsetsNeedsCheckpointReset(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement
= 5)
+    logs.get(tp).maybeIncrementLogStartOffset(10L)
+
+    var lastCleanOffset = Some(15L)
+    var cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset,
time.milliseconds)
+    assertFalse("Checkpoint offset should not be reset if valid", cleanableOffsets.forceUpdateCheckpoint)
+
+    logs.get(tp).maybeIncrementLogStartOffset(20L)
+    cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset,
time.milliseconds)
+    assertTrue("Checkpoint offset needs to be reset if less than log start offset", cleanableOffsets.forceUpdateCheckpoint)
+
+    lastCleanOffset = Some(25L)
+    cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset,
time.milliseconds)
+    assertTrue("Checkpoint offset needs to be reset if greater than log end offset", cleanableOffsets.forceUpdateCheckpoint)
   }
 
   @Test
@@ -505,8 +529,8 @@ class LogCleanerManagerTest extends Logging {
     time.sleep(compactionLag + 1)
     // although the compaction lag has been exceeded, the undecided data should not be cleaned
     var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
-    assertEquals(0L, cleanableOffsets._1)
-    assertEquals(0L, cleanableOffsets._2)
+    assertEquals(0L, cleanableOffsets.firstDirtyOffset)
+    assertEquals(0L, cleanableOffsets.firstUncleanableDirtyOffset)
 
     log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(), producerId,
producerEpoch,
       new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0,
@@ -516,15 +540,15 @@ class LogCleanerManagerTest extends Logging {
 
     // the first segment should now become cleanable immediately
     cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
-    assertEquals(0L, cleanableOffsets._1)
-    assertEquals(3L, cleanableOffsets._2)
+    assertEquals(0L, cleanableOffsets.firstDirtyOffset)
+    assertEquals(3L, cleanableOffsets.firstUncleanableDirtyOffset)
 
     time.sleep(compactionLag + 1)
 
     // the second segment becomes cleanable after the compaction lag
     cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
-    assertEquals(0L, cleanableOffsets._1)
-    assertEquals(4L, cleanableOffsets._2)
+    assertEquals(0L, cleanableOffsets.firstDirtyOffset)
+    assertEquals(4L, cleanableOffsets.firstUncleanableDirtyOffset)
   }
 
   @Test
@@ -574,6 +598,45 @@ class LogCleanerManagerTest extends Logging {
     assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
   }
 
+  /**
+   * Logs with invalid checkpoint offsets should update their checkpoint offset even if the
log doesn't need cleaning
+   */
+  @Test
+  def testCheckpointUpdatedForInvalidOffsetNoCleaning(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement
= 5)
+
+    logs.get(tp).maybeIncrementLogStartOffset(20L)
+    val cleanerManager = createCleanerManagerMock(logs)
+    cleanerCheckpoints.put(tp, 15L)
+
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals("Log should not be selected for cleaning", None, filthiestLog)
+    assertEquals("Unselected log should have checkpoint offset updated", 20L, cleanerCheckpoints.get(tp).get)
+  }
+
+  /**
+   * Logs with invalid checkpoint offsets should update their checkpoint offset even if they
aren't selected
+   * for immediate cleaning
+   */
+  @Test
+  def testCheckpointUpdatedForInvalidOffsetNotSelected(): Unit = {
+    val tp0 = new TopicPartition("foo", 0)
+    val tp1 = new TopicPartition("foo", 1)
+    val partitions = Seq(tp0, tp1)
+
+    // create two logs, one with an invalid offset, and one that is dirtier than the log
with an invalid offset
+    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement
= 5)
+    logs.get(tp0).maybeIncrementLogStartOffset(15L)
+    val cleanerManager = createCleanerManagerMock(logs)
+    cleanerCheckpoints.put(tp0, 10L)
+    cleanerCheckpoints.put(tp1, 5L)
+
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
+    assertEquals("Dirtier log should be selected", tp1, filthiestLog.topicPartition)
+    assertEquals("Unselected log should have checkpoint offset updated", 15L, cleanerCheckpoints.get(tp0).get)
+  }
+
   private def createCleanerManager(log: Log): LogCleanerManager = {
     val logs = new Pool[TopicPartition, Log]()
     logs.put(topicPartition, log)


Mime
View raw message