kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
Date Fri, 31 Jan 2020 11:54:51 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 8de3084  KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
8de3084 is described below

commit 8de308491e0a54792de53e430bc242ce52d2ac6d
Author: Tomislav <tomislav.rajakovic@gmail.com>
AuthorDate: Tue Jan 14 23:03:44 2020 +0100

    KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
    
    This fix makes the LogCleaner tolerant of gaps in the offset sequence. Previously, this
could lead to endless loops of cleaning which required manual intervention.
    
    Reviewers: Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 15 +++++++-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 43 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 47abae5..beb3a47 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.collection.{Iterable, Set, mutable}
+import scala.collection.mutable.ListBuffer
 import scala.util.control.ControlThrowable
 
 /**
@@ -865,6 +866,11 @@ private[log] class Cleaner(val id: Int,
                                   stats: CleanerStats) {
     map.clear()
     val dirty = log.logSegments(start, end).toBuffer
+    val nextSegmentStartOffsets = new ListBuffer[Long]
+    if (dirty.nonEmpty) {
+      for (nextSegment <- dirty.tail) nextSegmentStartOffsets.append(nextSegment.baseOffset)
+      nextSegmentStartOffsets.append(end)
+    }
     info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name,
dirty.size, start, end))
 
     val transactionMetadata = new CleanedTransactionMetadata
@@ -874,10 +880,10 @@ private[log] class Cleaner(val id: Int,
     // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
     // but we may be able to fit more (if there is lots of duplication in the dirty section
of the log)
     var full = false
-    for (segment <- dirty if !full) {
+    for ( (segment, nextSegmentStartOffset) <- dirty.zip(nextSegmentStartOffsets) if !full)
{
       checkDone(log.topicPartition)
 
-      full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize,
+      full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, nextSegmentStartOffset,
log.config.maxMessageSize,
         transactionMetadata, stats)
       if (full)
         debug("Offset map is full, %d segments fully mapped, segment with base offset %d
is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
@@ -898,6 +904,7 @@ private[log] class Cleaner(val id: Int,
                                        segment: LogSegment,
                                        map: OffsetMap,
                                        startOffset: Long,
+                                       nextSegmentStartOffset: Long,
                                        maxLogMessageSize: Int,
                                        transactionMetadata: CleanedTransactionMetadata,
                                        stats: CleanerStats): Boolean = {
@@ -951,6 +958,10 @@ private[log] class Cleaner(val id: Int,
       if(position == startPosition)
         growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
     }
+
+    // In the case of offsets gap, fast forward to latest expected offset in this segment.
+    map.updateLatestOffset(nextSegmentStartOffset - 1L)
+
     restoreBuffers()
     false
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ed19f34..aa0fb96 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1554,6 +1554,49 @@ class LogCleanerTest {
     assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
   }
 
+  /**
+   * Verify that the clean is able to move beyond missing offsets records in dirty log
+   */
+  @Test
+  def testCleaningBeyondMissingOffsets(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024*1024: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    val logConfig = LogConfig(logProps)
+    val cleaner = makeCleaner(Int.MaxValue)
+
+    {
+      val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
+      writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+      // roll new segment with baseOffset 11, leaving previous with holes in offset range
[9,10]
+      log.roll(Some(11L))
+
+      // active segment record
+      log.appendAsFollower(messageWithOffset(1015, 1015, 11L))
+
+      val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset,
needCompactionNow = true))
+      assertEquals("Cleaning point should pass offset gap", log.activeSegment.baseOffset,
nextDirtyOffset)
+    }
+
+
+    {
+      val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
+      writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+      // roll new segment with baseOffset 15, leaving previous with holes in offset rage
[10, 14]
+      log.roll(Some(15L))
+
+      writeToLog(log, (15 to 24) zip (15 to 24), (15L to 24L))
+      // roll new segment with baseOffset 30, leaving previous with holes in offset range
[25, 29]
+      log.roll(Some(30L))
+
+      // active segment record
+      log.appendAsFollower(messageWithOffset(1015, 1015, 30L))
+
+      val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset,
needCompactionNow = true))
+      assertEquals("Cleaning point should pass offset gap in multiple segments", log.activeSegment.baseOffset,
nextDirtyOffset)
+    }
+  }
+
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]):
Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
       yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset


Mime
View raw message