kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8724; Improve range checking when computing cleanable partitions (#7264)
Date Thu, 05 Sep 2019 16:39:15 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 064017b  KAFKA-8724; Improve range checking when computing cleanable partitions (#7264)
064017b is described below

commit 064017b2b7aff56675c10f27f2d1eb76ac221c2d
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Sep 5 09:10:52 2019 -0700

    KAFKA-8724; Improve range checking when computing cleanable partitions (#7264)
    
    This patch contains a few improvements on the offset range handling when computing the
cleanable range of offsets.
    
    1. It adds bounds checking to ensure the dirty offset cannot be larger than the log end
offset. If it is, we reset to the log start offset.
    2. It adds a method to get the non-active segments in the log while holding the lock.
This ensures that a truncation cannot lead to an invalid segment range.
    3. It improves exception messages in the case that an inconsistent segment range is provided
so that we have more information to find the root cause.
    
    The patch also fixes a few problems in `LogCleanerManagerTest` due to unintended reuse
of the underlying log directory.
    
    Reviewers: Vikas Singh <soondenana@users.noreply.github.com>, Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            |  20 +++
 core/src/main/scala/kafka/log/LogCleaner.scala     |  11 +-
 .../main/scala/kafka/log/LogCleanerManager.scala   |  38 ++--
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 193 ++++++++++++---------
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  46 ++---
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  37 ++++
 6 files changed, 216 insertions(+), 129 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7a9b76a..491a4c4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1917,12 +1917,32 @@ class Log(@volatile var dir: File,
   def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
     lock synchronized {
       val view = Option(segments.floorKey(from)).map { floor =>
+        if (to < floor)
+          throw new IllegalArgumentException(s"Invalid log segment range: requested segments
from offset $from " +
+            s"mapping to segment with base offset $floor, which is greater than limit offset
$to")
         segments.subMap(floor, to)
       }.getOrElse(segments.headMap(to))
       view.values.asScala
     }
   }
 
+  def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = {
+    lock synchronized {
+      if (from > activeSegment.baseOffset)
+        throw new IllegalArgumentException("Illegal request for non-active segments beginning
at " +
+          s"offset $from, which is larger than the active segment's base offset ${activeSegment.baseOffset}")
+      logSegments(from, activeSegment.baseOffset)
+    }
+  }
+
+  /**
+   * Get the largest log segment with a base offset less than or equal to the given offset,
if one exists.
+   * @return the optional log segment
+   */
+  private def floorLogSegment(offset: Long): Option[LogSegment] = {
+    Option(segments.floorEntry(offset)).map(_.getValue)
+  }
+
   override def toString = "Log(" + dir + ")"
 
   /**
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 3180f3d..7e51ff4 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -440,7 +440,7 @@ object LogCleaner {
     * @return the biggest uncleanable offset and the total amount of cleanable bytes
     */
   def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long):
(Long, Long) = {
-    val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
+    val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
     val firstUncleanableOffset = firstUncleanableSegment.baseOffset
     val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
 
@@ -466,7 +466,7 @@ private[log] class Cleaner(val id: Int,
                            dupBufferLoadFactor: Double,
                            throttler: Throttler,
                            time: Time,
-                           checkDone: (TopicPartition) => Unit) extends Logging {
+                           checkDone: TopicPartition => Unit) extends Logging {
 
   protected override def loggerName = classOf[LogCleaner].getName
 
@@ -1042,8 +1042,11 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
   * Helper class for a log, its topic/partition, the first cleanable position, the first
uncleanable dirty position,
   * and whether it needs compaction immediately.
   */
-private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset:
Long,
-                              uncleanableOffset: Long, needCompactionNow: Boolean = false)
extends Ordered[LogToClean] {
+private case class LogToClean(topicPartition: TopicPartition,
+                              log: Log,
+                              firstDirtyOffset: Long,
+                              uncleanableOffset: Long,
+                              needCompactionNow: Boolean = false) extends Ordered[LogToClean]
{
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
   val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log,
firstDirtyOffset, uncleanableOffset)
   val totalBytes = cleanBytes + cleanableBytes
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f8dce22..efa4d94 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -58,6 +58,8 @@ private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningS
 private[log] class LogCleanerManager(val logDirs: Seq[File],
                                      val logs: Pool[TopicPartition, Log],
                                      val logDirFailureChannel: LogDirFailureChannel) extends
Logging with KafkaMetricsGroup {
+  import LogCleanerManager._
+
 
   protected override def loggerName = classOf[LogCleaner].getName
 
@@ -103,7 +105,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
                   val now = Time.SYSTEM.milliseconds
                   partitions.map { tp =>
                     val log = logs.get(tp)
-                    val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log,
tp, lastClean, now)
+                    val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log,
tp, lastClean, now)
                     val (_, uncleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset,
firstUncleanableDirtyOffset)
                     uncleanableBytes
                   }.sum
@@ -178,10 +180,8 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
           inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
       }.map {
         case (topicPartition, log) => // create a LogToClean instance for each
-          val (firstDirtyOffset, firstUncleanableDirtyOffset) =
-            LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, now)
-
-          val compactionDelayMs = LogCleanerManager.getMaxCompactionDelay(log, firstDirtyOffset,
now)
+          val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition,
lastClean, now)
+          val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
           preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
 
           LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset,
compactionDelayMs > 0)
@@ -487,10 +487,8 @@ private[log] object LogCleanerManager extends Logging {
     * get max delay between the time when log is required to be compacted as determined
     * by maxCompactionLagMs and the current time.
     */
-  def getMaxCompactionDelay(log: Log, firstDirtyOffset: Long, now: Long) : Long = {
-
-    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
-
+  def maxCompactionDelay(log: Log, firstDirtyOffset: Long, now: Long) : Long = {
+    val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
     val firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).filter(_
> 0)
 
     val earliestDirtySegmentTimestamp = {
@@ -523,16 +521,24 @@ private[log] object LogCleanerManager extends Logging {
 
     // If the log segments are abnormally truncated and hence the checkpointed offset is
no longer valid;
     // reset to the log starting offset and log the error
-    val logStartOffset = log.logSegments.head.baseOffset
     val firstDirtyOffset = {
-      val offset = lastCleanOffset.getOrElse(logStartOffset)
-      if (offset < logStartOffset) {
-        // don't bother with the warning if compact and delete are enabled.
+      val logStartOffset = log.logStartOffset
+      val checkpointDirtyOffset = lastCleanOffset.getOrElse(logStartOffset)
+
+      if (checkpointDirtyOffset < logStartOffset) {
+        // Don't bother with the warning if compact and delete are enabled.
         if (!isCompactAndDelete(log))
-          warn(s"Resetting first dirty offset of ${log.name} to log start offset $logStartOffset
since the checkpointed offset $offset is invalid.")
+          warn(s"Resetting first dirty offset of ${log.name} to log start offset $logStartOffset
" +
+            s"since the checkpointed offset $checkpointDirtyOffset is invalid.")
+        logStartOffset
+      } else if (checkpointDirtyOffset > log.logEndOffset) {
+        // The dirty offset has gotten ahead of the log end offset. This could happen if
there was data
+        // corruption at the end of the log. We conservatively assume that the full log needs
cleaning.
+        warn(s"The last checkpoint dirty offset for partition $topicPartition is $checkpointDirtyOffset,
" +
+          s"which is larger than the log end offset ${log.logEndOffset}. Resetting to the
log start offset $logStartOffset.")
         logStartOffset
       } else {
-        offset
+        checkpointDirtyOffset
       }
     }
 
@@ -552,7 +558,7 @@ private[log] object LogCleanerManager extends Logging {
       // the first segment whose largest message timestamp is within a minimum time lag from
now
       if (minCompactionLagMs > 0) {
         // dirty log segments
-        val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
+        val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
         dirtyNonActiveSegments.find { s =>
           val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
           debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset}
segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs};
is uncleanable=$isUncleanable")
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 7c83cde..83098bf 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -61,107 +61,120 @@ class LogCleanerManagerTest extends Logging {
     Utils.delete(tmpDir)
   }
 
+  private def setupIncreasinglyFilthyLogs(partitions: Seq[TopicPartition],
+                                          startNumBatches: Int,
+                                          batchIncrement: Int): Pool[TopicPartition, Log]
= {
+    val logs = new Pool[TopicPartition, Log]()
+    var numBatches = startNumBatches
+
+    for (tp <- partitions) {
+      val log = createLog(2048, LogConfig.Compact, topicPartition = tp)
+      logs.put(tp, log)
+
+      writeRecords(log, numBatches = numBatches, recordsPerBatch = 1, batchesPerSegment =
5)
+      numBatches += batchIncrement
+    }
+    logs
+  }
+
   @Test
   def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes)
-    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
-    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
-    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+    val tp0 = new TopicPartition("wishing-well", 0)
+    val tp1 = new TopicPartition("wishing-well", 1)
+    val tp2 = new TopicPartition("wishing-well", 2)
+    val partitions = Seq(tp0, tp1, tp2)
 
-    val logs = new Pool[TopicPartition, Log]()
-    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
-    logs.put(tp1, log1)
-    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
-    logs.put(tp2, log2)
-    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
-    logs.put(tp3, log3)
-    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
-    cleanerCheckpoints.put(tp1, 0) // all clean
-    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
-    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+    // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement
= 5)
+    val cleanerManager = createCleanerManagerMock(logs)
+    partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
 
     val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
-
-    assertEquals(log2, filthiestLog.log)
     assertEquals(tp2, filthiestLog.topicPartition)
+    assertEquals(tp2, filthiestLog.log.topicPartition)
   }
 
   @Test
   def testGrabFilthiestCompactedLogIgnoresUncleanablePartitions(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes)
-    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
-    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
-    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+    val tp0 = new TopicPartition("wishing-well", 0)
+    val tp1 = new TopicPartition("wishing-well", 1)
+    val tp2 = new TopicPartition("wishing-well", 2)
+    val partitions = Seq(tp0, tp1, tp2)
 
-    val logs = new Pool[TopicPartition, Log]()
-    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
-    logs.put(tp1, log1)
-    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
-    logs.put(tp2, log2)
-    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
-    logs.put(tp3, log3)
-    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
-    cleanerCheckpoints.put(tp1, 0) // all clean
-    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
-    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
-    cleanerManager.markPartitionUncleanable(log2.dir.getParent, tp2)
+    // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement
= 5)
+    val cleanerManager = createCleanerManagerMock(logs)
+    partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
 
-    val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
+    cleanerManager.markPartitionUncleanable(logs.get(tp2).dir.getParent, tp2)
 
-    assertEquals(log3, filthiestLog.log)
-    assertEquals(tp3, filthiestLog.topicPartition)
+    val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
+    assertEquals(tp1, filthiestLog.topicPartition)
+    assertEquals(tp1, filthiestLog.log.topicPartition)
   }
 
   @Test
   def testGrabFilthiestCompactedLogIgnoresInProgressPartitions(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes)
-    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
-    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
-    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+    val tp0 = new TopicPartition("wishing-well", 0)
+    val tp1 = new TopicPartition("wishing-well", 1)
+    val tp2 = new TopicPartition("wishing-well", 2)
+    val partitions = Seq(tp0, tp1, tp2)
+
+    // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement
= 5)
+    val cleanerManager = createCleanerManagerMock(logs)
+    partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
 
-    val logs = new Pool[TopicPartition, Log]()
-    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
-    logs.put(tp1, log1)
-    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
-    logs.put(tp2, log2)
-    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
-    logs.put(tp3, log3)
-    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
-    cleanerCheckpoints.put(tp1, 0) // all clean
-    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
-    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
     cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
 
     val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
 
-    assertEquals(log3, filthiestLog.log)
-    assertEquals(tp3, filthiestLog.topicPartition)
+    assertEquals(tp1, filthiestLog.topicPartition)
+    assertEquals(tp1, filthiestLog.log.topicPartition)
   }
 
   @Test
   def testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions():
Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes)
-    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
-    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
-    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+    val tp0 = new TopicPartition("wishing-well", 0)
+    val tp1 = new TopicPartition("wishing-well", 1)
+    val tp2 = new TopicPartition("wishing-well", 2)
+    val partitions = Seq(tp0, tp1, tp2)
+
+    // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement
= 5)
+    val cleanerManager = createCleanerManagerMock(logs)
+    partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
 
-    val logs = new Pool[TopicPartition, Log]()
-    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
-    logs.put(tp1, log1)
-    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
-    logs.put(tp2, log2)
-    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
-    logs.put(tp3, log3)
-    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
-    cleanerCheckpoints.put(tp1, 0) // all clean
-    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
-    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
     cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
-    cleanerManager.markPartitionUncleanable(log3.dir.getParent, tp3)
+    cleanerManager.markPartitionUncleanable(logs.get(tp1).dir.getParent, tp1)
 
     val filthiestLog: Option[LogToClean] = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals(None, filthiestLog)
+  }
+
+  @Test
+  def testDirtyOffsetResetIfLargerThanEndOffset(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement
= 5)
+    val cleanerManager = createCleanerManagerMock(logs)
+    cleanerCheckpoints.put(tp, 200)
+
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
+    assertEquals(0L, filthiestLog.firstDirtyOffset)
+  }
+
+  @Test
+  def testDirtyOffsetResetIfSmallerThanStartOffset(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement
= 5)
+
+    logs.get(tp).maybeIncrementLogStartOffset(10L)
 
-    assertTrue(filthiestLog.isEmpty)
+    val cleanerManager = createCleanerManagerMock(logs)
+    cleanerCheckpoints.put(tp, 0L)
+
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
+    assertEquals(10L, filthiestLog.firstDirtyOffset)
   }
 
   /**
@@ -482,17 +495,16 @@ class LogCleanerManagerTest extends Logging {
   private def createCleanerManager(log: Log): LogCleanerManager = {
     val logs = new Pool[TopicPartition, Log]()
     logs.put(topicPartition, log)
-    createCleanerManager(logs)
+    new LogCleanerManager(Array(logDir), logs, null)
   }
 
-  private def createCleanerManager(pool: Pool[TopicPartition, Log], toMock: Boolean = false):
LogCleanerManager = {
-    if (toMock)
-      new LogCleanerManagerMock(Array(logDir), pool, null)
-    else
-      new LogCleanerManager(Array(logDir), pool, null)
+  private def createCleanerManagerMock(pool: Pool[TopicPartition, Log]): LogCleanerManagerMock
= {
+    new LogCleanerManagerMock(Array(logDir), pool, null)
   }
 
-  private def createLog(segmentSize: Int, cleanupPolicy: String, segmentsCount: Int = 0):
Log = {
+  private def createLog(segmentSize: Int,
+                        cleanupPolicy: String,
+                        topicPartition: TopicPartition = new TopicPartition("log", 0)): Log
= {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
     logProps.put(LogConfig.RetentionMsProp, 1: Integer)
@@ -500,8 +512,9 @@ class LogCleanerManagerTest extends Logging {
     logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for
easier and clearer tests
 
     val config = LogConfig(logProps)
-    val partitionDir = new File(logDir, "log-0")
-    val log = Log(partitionDir,
+    val partitionDir = new File(logDir, Log.logDirName(topicPartition))
+
+    Log(partitionDir,
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
@@ -511,10 +524,15 @@ class LogCleanerManagerTest extends Logging {
       maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10))
-    for (i <- 0 until segmentsCount) {
-      val startOffset = i * 10
-      val endOffset = startOffset + 10
-      val segment = LogUtils.createSegment(startOffset, logDir)
+  }
+
+  private def writeRecords(log: Log,
+                           numBatches: Int,
+                           recordsPerBatch: Int,
+                           batchesPerSegment: Int): Unit = {
+    for (i <- 0 until numBatches) {
+      val startOffset = i * recordsPerBatch
+      val endOffset = startOffset + recordsPerBatch
       var lastTimestamp = 0L
       val records = (startOffset until endOffset).map { offset =>
         val currentTimestamp = time.milliseconds()
@@ -524,10 +542,13 @@ class LogCleanerManagerTest extends Logging {
         new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, s"value-$offset".getBytes)
       }
 
-      segment.append(endOffset, lastTimestamp, endOffset, MemoryRecords.withRecords(CompressionType.NONE,
records:_*))
-      log.addSegment(segment)
+      log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch
= 1)
+      log.onHighWatermarkIncremented(log.logEndOffsetMetadata.messageOffset)
+
+      if (i % batchesPerSegment == 0)
+        log.roll()
     }
-    log
+    log.roll()
   }
 
   private def makeLog(dir: File = logDir, config: LogConfig) =
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 825fa90..ed19f34 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -346,7 +346,7 @@ class LogCleanerTest {
     log.roll()
 
     // cannot remove the marker in this pass because there are still valid records
-    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(1, 3, 2), LogTest.keysInLog(log))
     assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
 
@@ -355,17 +355,17 @@ class LogCleanerTest {
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still
records for the transaction
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // delete horizon forced to 0 to verify marker is not removed early
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= 0L)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = 0L)._1
     assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // clean again with large delete horizon and verify the marker is removed
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
     assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
   }
@@ -394,11 +394,11 @@ class LogCleanerTest {
     log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient
= false)
     log.roll()
 
-    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs
= Long.MaxValue)
     assertEquals(List(2), LogTest.keysInLog(log))
     assertEquals(List(1, 3, 4), offsetsInLog(log))
 
-    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs
= Long.MaxValue)
     assertEquals(List(2), LogTest.keysInLog(log))
     assertEquals(List(3, 4), offsetsInLog(log))
   }
@@ -433,14 +433,14 @@ class LogCleanerTest {
 
     // first time through the records are removed
     // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit},
{2}, {3}, {Producer1: Commit}]
-    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
     assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
     assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
 
     // the empty batch remains if cleaned again because it still holds the last sequence
     // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit},
{2}, {3}, {Producer1: Commit}]
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
     assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
     assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
@@ -453,13 +453,13 @@ class LogCleanerTest {
     log.roll()
 
     // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1:
Commit}, {Producer2: 1}, {Producer2: Commit}]
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
     assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
     assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
 
     // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2:
1}, {Producer2: Commit}]
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
     assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
     assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
@@ -483,14 +483,14 @@ class LogCleanerTest {
 
     // first time through the control batch is retained as an empty batch
     // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
-    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
     assertEquals(List(1, 2), offsetsInLog(log))
     assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
 
     // the empty control batch does not cause an exception when cleaned
     // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
     assertEquals(List(1, 2), offsetsInLog(log))
     assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
@@ -514,7 +514,7 @@ class LogCleanerTest {
     log.roll()
 
     // Both the record and the marker should remain after cleaning
-    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs
= Long.MaxValue)
     assertEquals(List(0, 1), offsetsInLog(log))
     assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
   }
@@ -539,12 +539,12 @@ class LogCleanerTest {
     // Both the batch and the marker should remain after cleaning. The batch is retained
     // because it is the last entry for this producerId. The marker is retained because
     // there are still batches remaining from this transaction.
-    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs
= Long.MaxValue)
     assertEquals(List(1), offsetsInLog(log))
     assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
 
     // The empty batch and the marker is still retained after a second cleaning.
-    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs
= Long.MaxValue)
     assertEquals(List(1), offsetsInLog(log))
     assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
   }
@@ -569,12 +569,12 @@ class LogCleanerTest {
     log.roll()
 
     // delete horizon set to 0 to verify marker is not removed early
-    val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = 0L)._1
+    val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset),
deleteHorizonMs = 0L)._1
     assertEquals(List(3), LogTest.keysInLog(log))
     assertEquals(List(3, 4, 5), offsetsInLog(log))
 
     // clean again with large delete horizon and verify the marker is removed
-    cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs
= Long.MaxValue)
     assertEquals(List(3), LogTest.keysInLog(log))
     assertEquals(List(4, 5), offsetsInLog(log))
   }
@@ -608,12 +608,12 @@ class LogCleanerTest {
 
     // Both transactional batches will be cleaned. The last one will remain in the log
     // as an empty batch in order to preserve the producer sequence number and epoch
-    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs
= Long.MaxValue)
     assertEquals(List(1, 3, 4, 5), offsetsInLog(log))
     assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
 
     // On the second round of cleaning, the marker from the first transaction should be removed.
-    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs
= Long.MaxValue)
     assertEquals(List(3, 4, 5), offsetsInLog(log))
     assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
   }
@@ -645,14 +645,14 @@ class LogCleanerTest {
     assertAbortedTransactionIndexed()
 
     // first time through the records are removed
-    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertAbortedTransactionIndexed()
     assertEquals(List(), LogTest.keysInLog(log))
     assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
     assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained
 
     // the empty batch remains if cleaned again because it still holds the last sequence
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertAbortedTransactionIndexed()
     assertEquals(List(), LogTest.keysInLog(log))
     assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
@@ -662,13 +662,13 @@ class LogCleanerTest {
     appendProducer(Seq(1))
     log.roll()
 
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertAbortedTransactionIndexed()
     assertEquals(List(1), LogTest.keysInLog(log))
     assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet gone because we
read the empty batch
     assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not preserve the
empty batch
 
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(1), LogTest.keysInLog(log))
     assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
     assertEquals(List(3), lastOffsetsPerBatchInLog(log))
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e529fe9..a864da0 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -336,6 +336,43 @@ class LogTest {
     assertEquals(startOffset, log.logEndOffset)
   }
 
+  @Test
+  def testNonActiveSegmentsFrom(): Unit = {
+    val logConfig = LogTest.createLogConfig()
+    val log = createLog(logDir, logConfig)
+
+    for (i <- 0 until 5) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+
+    def nonActiveBaseOffsetsFrom(startOffset: Long): Seq[Long] = {
+      log.nonActiveLogSegmentsFrom(startOffset).map(_.baseOffset).toSeq
+    }
+
+    assertEquals(5L, log.activeSegment.baseOffset)
+    assertEquals(0 until 5, nonActiveBaseOffsetsFrom(0L))
+    assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(5L))
+    assertEquals(2 until 5, nonActiveBaseOffsetsFrom(2L))
+  }
+
+  @Test
+  def testInconsistentLogSegmentRange(): Unit = {
+    val logConfig = LogTest.createLogConfig()
+    val log = createLog(logDir, logConfig)
+
+    for (i <- 0 until 5) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+
+    assertThrows[IllegalArgumentException] {
+      log.logSegments(5, 1)
+    }
+  }
+
   private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String):
Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion
= messageFormatVersion)
     var log = createLog(logDir, logConfig)


Mime
View raw message