kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left (#10818)
Date Sat, 19 Jun 2021 22:35:15 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ac5ddc5  KAFKA-12889: log clean relative index range check of group consider empty
log segment to avoid too many empty log segment left (#10818)
ac5ddc5 is described below

commit ac5ddc574ef23279267a8f9bda737a840be30c85
Author: iamgd67 <iamgd67@sina.com>
AuthorDate: Sun Jun 20 06:33:52 2021 +0800

    KAFKA-12889: log clean relative index range check of group consider empty log segment
to avoid too many empty log segment left (#10818)
    
    To avoid log index 4 byte relative offset overflow, log cleaner group check log segments
offset to make sure group offset range not exceed Int.MaxValue.
    
    This offset check currentlly not cosider next is next log segment is empty, so there will
left empty log files every about 2^31 messages.
    
    The left empty logs will be reprocessed every clean cycle, which will rewrite it with
same empty content, witch cause little no need io.
    
    For __consumer_offsets topic, normally we can set cleanup.policy to compact,delete to
get rid of this.
    
    My cluster is 0.10.1.1, but after analyze the trunk code, it should has same problem too.
    
    Co-authored-by: Liu Qiang(BSS-HZ) <qliu.zj@best-inc.com>
    
    Reviewers: Luke Chen <showuon@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  5 ++-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 46 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 80916cd..1f1d776 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -837,7 +837,10 @@ private[log] class Cleaner(val id: Int,
             logSize + segs.head.size <= maxSize &&
             indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
             timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
-            lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset
<= Int.MaxValue) {
+            //if first segment size is 0, we don't need to do the index offset range check.
+            //this will avoid empty log left every 2^31 message.
+            (segs.head.size == 0 ||
+              lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset
<= Int.MaxValue)) {
         group = segs.head :: group
         logSize += segs.head.size
         indexSize += segs.head.offsetIndex.sizeInBytes
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 5c91041..9352f10 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1174,6 +1174,52 @@ class LogCleanerTest {
       "All but the last group should be the target size.")
   }
 
+  @Test
+  def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val k="key".getBytes()
+    val v="val".getBytes()
+
+    //create 3 segments
+    for(i <- 0 until 3){
+      log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
+      //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th
segment
+      val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
+      log.appendAsFollower(records)
+      assertEquals(i + 1, log.numberOfSegments)
+    }
+
+    //4th active segment, not clean
+    log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
+
+    val totalSegments = 4
+    //last segment not cleanable
+    val firstUncleanableOffset = log.logEndOffset - 1
+    val notCleanableSegments = 1
+
+    assertEquals(totalSegments, log.numberOfSegments)
+    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize
= Int.MaxValue, firstUncleanableOffset)
+    //because index file uses 4 byte relative index offset and current segments all none
empty,
+    //segments will not group even their size is very small.
+    assertEquals(totalSegments - notCleanableSegments, groups.size)
+    //do clean to clean first 2 segments to empty
+    cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
+    assertEquals(totalSegments, log.numberOfSegments)
+    assertEquals(0, log.logSegments.head.size)
+
+    //after clean we got 2 empty segment, they will group together this time
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize
= Int.MaxValue, firstUncleanableOffset)
+    val noneEmptySegment = 1
+    assertEquals(noneEmptySegment + 1, groups.size)
+
+    //trigger a clean and 2 empty segments should cleaned to 1
+    cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
+    assertEquals(totalSegments - 1, log.numberOfSegments)
+  }
+
   /**
    * Validate the logic for grouping log segments together for cleaning when only a small
number of
    * messages are retained, but the range of offsets is greater than Int.MaxValue. A group
should not

Mime
View raw message