kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3894; log cleaner can partially clean a segment
Date Mon, 22 Aug 2016 17:58:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 113d23e8f -> 6ed3e6b1c


KAFKA-3894; log cleaner can partially clean a segment

As discussed in https://issues.apache.org/jira/browse/KAFKA-3894, this PR makes the log cleaner
do a "partial" clean on a segment, whereby it builds a partial offset map up to a particular
offset in a segment. Once cleaning resumes again, we will continue from the next dirty offset,
which can now be located in the middle of a segment.

Prior to this PR, segments with overly numerous keys could crash the log cleaner thread, as
it was required that the log cleaner had to fit at least a single segment in the offset map.

Author: Tom Crayford <tcrayford@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1725 from tcrayford/dont_crash_log_cleaner_thread_if_segment_overflows_buffer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ed3e6b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ed3e6b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ed3e6b1

Branch: refs/heads/trunk
Commit: 6ed3e6b1cb8a73b1f5f78926ccb247a8953a554c
Parents: 113d23e
Author: Tom Crayford <tcrayford@googlemail.com>
Authored: Mon Aug 22 10:58:40 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Aug 22 10:58:40 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 46 +++++------
 core/src/main/scala/kafka/log/OffsetMap.scala   | 19 ++++-
 .../test/scala/unit/kafka/log/CleanerTest.scala | 85 ++++++++++++++++++--
 3 files changed, 111 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6ed3e6b1/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d4bb1f2..ef880e6 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -311,6 +311,8 @@ private[log] class Cleaner(val id: Int,
   /* buffer used for write i/o */
   private var writeBuffer = ByteBuffer.allocate(ioBufferSize)
 
+  require(offsetMap.slots * dupBufferLoadFactor > 1, "offset map is too small to fit in
even a single message, so log cleaning will never make progress. You can increase log.cleaner.dedupe.buffer.size
or decrease log.cleaner.threads")
+
   /**
    * Clean the given log
    *
@@ -326,7 +328,8 @@ private[log] class Cleaner(val id: Int,
     // build the offset map
     info("Building offset map for %s...".format(cleanable.log.name))
     val upperBoundOffset = log.activeSegment.baseOffset
-    val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap)
+ 1
+    buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap)
+    val endOffset = offsetMap.latestOffset + 1
     stats.indexDone()
     
     // figure out the timestamp below which it is safe to remove delete tombstones
@@ -341,7 +344,7 @@ private[log] class Cleaner(val id: Int,
     info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
     for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
log.config.maxIndexSize))
       cleanSegments(log, group, offsetMap, deleteHorizonMs)
-      
+
     // record buffer utilization
     stats.bufferUtilization = offsetMap.utilization
     
@@ -533,6 +536,10 @@ private[log] class Cleaner(val id: Int,
                                   map: kafka.log.OffsetMap,
                                   retainDeletes: Boolean,
                                   entry: kafka.message.MessageAndOffset): Boolean = {
+    val pastLatestOffset = entry.offset > map.latestOffset
+    if (pastLatestOffset)
+      return true
+
     val key = entry.message.key
     if (key != null) {
       val foundOffset = map.get(key)
@@ -613,34 +620,23 @@ private[log] class Cleaner(val id: Int,
    * @param start The offset at which dirty messages begin
    * @param end The ending offset for the map that is being built
    * @param map The map in which to store the mappings
-   *
-   * @return The final offset the map covers
    */
-  private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long
= {
+  private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap) {
     map.clear()
     val dirty = log.logSegments(start, end).toBuffer
     info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name,
dirty.size, start, end))
     
     // Add all the dirty segments. We must take at least map.slots * load_factor,
     // but we may be able to fit more (if there is lots of duplication in the dirty section
of the log)
-    var offset = dirty.head.baseOffset
-    require(offset == start, "Last clean offset is %d but segment base offset is %d for log
%s.".format(start, offset, log.name))
     var full = false
     for (segment <- dirty if !full) {
       checkDone(log.topicAndPartition)
 
-      val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
-      if (newOffset > -1L)
-        offset = newOffset
-      else {
-        // If not even one segment can fit in the map, compaction cannot happen
-        require(offset > start, "Unable to build the offset map for segment %s/%s. You
can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name,
segment.log.file.getName))
+      full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, start)
+      if (full)
         debug("Offset map is full, %d segments fully mapped, segment with base offset %d
is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
-        full = true
-      }
     }
     info("Offset map for log %s complete.".format(log.name))
-    offset
   }
 
   /**
@@ -649,11 +645,10 @@ private[log] class Cleaner(val id: Int,
    * @param segment The segment to index
    * @param map The map in which to store the key=>offset mapping
    *
-   * @return The final offset covered by the map or -1 if the map is full
+   * @return If the map was filled whilst loading from this segment
    */
-  private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment,
map: OffsetMap): Long = {
-    var position = 0
-    var offset = segment.baseOffset
+  private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment,
map: OffsetMap, start: Long): Boolean = {
+    var position = segment.index.lookup(start).position
     val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
       checkDone(topicAndPartition)
@@ -663,15 +658,12 @@ private[log] class Cleaner(val id: Int,
       val startPosition = position
       for (entry <- messages) {
         val message = entry.message
-        if (message.hasKey) {
+        if (message.hasKey && entry.offset >= start) {
           if (map.size < maxDesiredMapSize)
             map.put(message.key, entry.offset)
-          else {
-            // The map is full, stop looping and return
-            return -1L
-          }
+          else
+            return true
         }
-        offset = entry.offset
         stats.indexMessagesRead(1)
       }
       position += messages.validBytes
@@ -682,7 +674,7 @@ private[log] class Cleaner(val id: Int,
         growBuffers()
     }
     restoreBuffers()
-    offset
+    return false
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ed3e6b1/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
index f453030..1df0615 100755
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -30,6 +30,7 @@ trait OffsetMap {
   def clear()
   def size: Int
   def utilization: Double = size.toDouble / slots
+  def latestOffset: Long
 }
 
 /**
@@ -60,7 +61,10 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5")
extend
   
   /* the number of probes for all lookups */
   private var probes = 0L
-  
+
+  /* the latest offset written into the map */
+  private var lastOffset = -1L
+
   /**
    * The number of bytes of space each entry uses (the number of bytes in the hash plus an
8 byte offset)
    */
@@ -89,6 +93,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5")
extend
       if(Arrays.equals(hash1, hash2)) {
         // we found an existing entry, overwrite it and return (size does not change)
         bytes.putLong(offset)
+        lastOffset = offset
         return
       }
       attempt += 1
@@ -98,6 +103,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5")
extend
     bytes.position(pos)
     bytes.put(hash1)
     bytes.putLong(offset)
+    lastOffset = offset
     entries += 1
   }
   
@@ -106,7 +112,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5")
extend
    */
   private def isEmpty(position: Int): Boolean = 
     bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && bytes.getLong(position
+ 16) == 0
-  
+
   /**
    * Get the offset associated with this key.
    * @param key The key
@@ -136,12 +142,12 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5")
extend
   
   /**
    * Change the salt used for key hashing making all existing keys unfindable.
-   * Doesn't actually zero out the array.
    */
   override def clear() {
     this.entries = 0
     this.lookups = 0L
     this.probes = 0L
+    this.lastOffset = -1L
     Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte)
   }
   
@@ -155,7 +161,12 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5")
extend
    */
   def collisionRate: Double = 
     (this.probes - this.lookups) / this.lookups.toDouble
-  
+
+  /**
+   * The latest offset put into the map
+   */
+  override def latestOffset: Long = lastOffset
+
   /**
    * Calculate the ith probe position. We first try reading successive integers from the
hash itself
    * then if all of those fail we degrade to linear probing.

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ed3e6b1/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 15920ad..5b0ce9a 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -108,6 +108,38 @@ class CleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testPartialSegmentClean() {
+    // because loadFactor is 0.75, this means we can fit 2 messages in the map
+    var cleaner = makeCleaner(2)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    log.append(message(0,0)) // offset 0
+    log.append(message(1,1)) // offset 1
+    log.append(message(0,0)) // offset 2
+    log.append(message(1,1)) // offset 3
+    log.append(message(0,0)) // offset 4
+    // roll the segment, so we can clean the messages already appended
+    log.roll()
+
+    // clean the log with only one message removed
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2))
+    assertEquals(immutable.List(1,0,1,0), keysInLog(log))
+    assertEquals(immutable.List(1,2,3,4), offsetsInLog(log))
+
+    // continue to make progress, even though we can only clean one message at a time
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3))
+    assertEquals(immutable.List(0,1,0), keysInLog(log))
+    assertEquals(immutable.List(2,3,4), offsetsInLog(log))
+
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4))
+    assertEquals(immutable.List(1,0), keysInLog(log))
+    assertEquals(immutable.List(3,4), offsetsInLog(log))
+  }
+
+  @Test
   def testLogToClean: Unit = {
     // create a log with small segment size
     val logProps = new Properties()
@@ -159,6 +191,10 @@ class CleanerTest extends JUnitSuite {
   def keysInLog(log: Log): Iterable[Int] =
     log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m
=> TestUtils.readString(m.message.key).toInt))
 
+  /* extract all the offsets from a log */
+  def offsetsInLog(log: Log): Iterable[Long] =
+    log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m
=> m.offset))
+
   def unkeyedMessageCountInLog(log: Log) =
     log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum
 
@@ -307,7 +343,8 @@ class CleanerTest extends JUnitSuite {
     val end = 500
     val offsets = writeToLog(log, (start until end) zip (start until end))
     def checkRange(map: FakeOffsetMap, start: Int, end: Int) {
-      val endOffset = cleaner.buildOffsetMap(log, start, end, map) + 1
+      cleaner.buildOffsetMap(log, start, end, map)
+      val endOffset = map.latestOffset + 1
       assertEquals("Last offset should be the end offset.", end, endOffset)
       assertEquals("Should have the expected number of messages in the map.", end-start,
map.size)
       for(i <- start until end)
@@ -439,13 +476,39 @@ class CleanerTest extends JUnitSuite {
     val end = 2
     val offsetSeq = Seq(0L, 7206178L)
     val offsets = writeToLog(log, (start until end) zip (start until end), offsetSeq)
-    val endOffset = cleaner.buildOffsetMap(log, start, end, map)
+    cleaner.buildOffsetMap(log, start, end, map)
+    val endOffset = map.latestOffset
     assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
     assertEquals("Should have the expected number of messages in the map.", end - start,
map.size)
     assertEquals("Map should contain first value", 0L, map.get(key(0)))
     assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
   }
 
+  /**
+   * Test building a partial offset map of part of a log segment
+   */
+  @Test
+  def testBuildPartialOffsetMap() {
+    // because loadFactor is 0.75, this means we can fit 2 messages in the map
+    val map = new FakeOffsetMap(3)
+    val log = makeLog()
+    val cleaner = makeCleaner(2)
+
+    log.append(message(0,0))
+    log.append(message(1,1))
+    log.append(message(2,2))
+    log.append(message(3,3))
+    log.append(message(4,4))
+    log.roll()
+
+    cleaner.buildOffsetMap(log, 2, Int.MaxValue, map)
+    assertEquals(2, map.size)
+    assertEquals(-1, map.get(key(0)))
+    assertEquals(2, map.get(key(2)))
+    assertEquals(3, map.get(key(3)))
+    assertEquals(-1, map.get(key(4)))
+  }
+
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]):
Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
       yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset
@@ -469,7 +532,7 @@ class CleanerTest extends JUnitSuite {
                 offsetMap = new FakeOffsetMap(capacity), 
                 ioBufferSize = 64*1024, 
                 maxIoBufferSize = 64*1024,
-                dupBufferLoadFactor = 0.75,                
+                dupBufferLoadFactor = 0.75,
                 throttler = throttler, 
                 time = time,
                 checkDone = checkDone )
@@ -500,12 +563,15 @@ class CleanerTest extends JUnitSuite {
 
 class FakeOffsetMap(val slots: Int) extends OffsetMap {
   val map = new java.util.HashMap[String, Long]()
-  
-  private def keyFor(key: ByteBuffer) = 
+  var lastOffset = -1L
+
+  private def keyFor(key: ByteBuffer) =
     new String(Utils.readBytes(key.duplicate), "UTF-8")
-  
-  def put(key: ByteBuffer, offset: Long): Unit = 
+
+  def put(key: ByteBuffer, offset: Long): Unit = {
+    lastOffset = offset
     map.put(keyFor(key), offset)
+  }
   
   def get(key: ByteBuffer): Long = {
     val k = keyFor(key)
@@ -518,5 +584,8 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
   def clear() = map.clear()
   
   def size: Int = map.size
-  
+
+  def latestOffset: Long = lastOffset
+
+  override def toString: String = map.toString()
 }


Mime
View raw message