kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3587; LogCleaner fails due to incorrect offset map computation
Date Mon, 09 May 2016 17:10:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 29a682e2e -> 2caf872c2


KAFKA-3587; LogCleaner fails due to incorrect offset map computation

Removed the over pessimistic require and instead attempt to fill the dedup buffer. Use the
(only) map until full;
this may allow to process all dirty segment (optimism) or may happen in the middle of a dirt
segment.
In either case, do compaction using the map loaded that way.

This patch was developed with edoardocomar

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #1332 from mimaison/KAFKA-3587


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2caf872c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2caf872c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2caf872c

Branch: refs/heads/trunk
Commit: 2caf872c2e51d689c6ac20240c4a306e36d98b15
Parents: 29a682e
Author: Mickael Maison <mickael.maison@gmail.com>
Authored: Mon May 9 18:10:40 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon May 9 18:10:40 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 27 ++++++++++------
 .../test/scala/unit/kafka/log/CleanerTest.scala | 33 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2caf872c/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 0f742f9..c6636be 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -615,17 +615,19 @@ private[log] class Cleaner(val id: Int,
     // but we may be able to fit more (if there is lots of duplication in the dirty section
of the log)
     var offset = dirty.head.baseOffset
     require(offset == start, "Last clean offset is %d but segment base offset is %d for log
%s.".format(start, offset, log.name))
-    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     var full = false
     for (segment <- dirty if !full) {
       checkDone(log.topicAndPartition)
-      val segmentSize = segment.nextOffset() - segment.baseOffset
 
-      require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset
map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize,
 log.name, segment.log.file.getName, maxDesiredMapSize))
-      if (map.size + segmentSize <= maxDesiredMapSize)
-        offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
-      else
+      val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
+      if (newOffset > -1L)
+        offset = newOffset
+      else {
+        // If not even one segment can fit in the map, compaction cannot happen
+        require(offset > start, "Unable to build the offset map for segment %s/%s. You
can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name,
segment.log.file.getName))
+        debug("Offset map is full, %d segments fully mapped, segment with base offset %d
is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
         full = true
+      }
     }
     info("Offset map for log %s complete.".format(log.name))
     offset
@@ -637,11 +639,12 @@ private[log] class Cleaner(val id: Int,
    * @param segment The segment to index
    * @param map The map in which to store the key=>offset mapping
    *
-   * @return The final offset covered by the map
+   * @return The final offset covered by the map or -1 if the map is full
    */
   private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment,
map: OffsetMap): Long = {
     var position = 0
     var offset = segment.baseOffset
+    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
       checkDone(topicAndPartition)
       readBuffer.clear()
@@ -650,8 +653,14 @@ private[log] class Cleaner(val id: Int,
       val startPosition = position
       for (entry <- messages) {
         val message = entry.message
-        if (message.hasKey)
-          map.put(message.key, entry.offset)
+        if (message.hasKey) {
+          if (map.size < maxDesiredMapSize)
+            map.put(message.key, entry.offset)
+          else {
+            // The map is full, stop looping and return
+            return -1L
+          }
+        }
         offset = entry.offset
         stats.indexMessagesRead(1)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2caf872c/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index b6849f0..752a260 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -423,6 +423,39 @@ class CleanerTest extends JUnitSuite {
     recoverAndCheck(config, cleanedKeys)
     
   }
+
+  @Test
+  def testBuildOffsetMapFakeLarge() {
+    val map = new FakeOffsetMap(1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    val logConfig = LogConfig(logProps)
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val start = 0
+    val end = 2
+    val offsetSeq = Seq(0L, 7206178L)
+    val offsets = writeToLog(log, (start until end) zip (start until end), offsetSeq)
+    val endOffset = cleaner.buildOffsetMap(log, start, end, map)
+    assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
+    assertEquals("Should have the expected number of messages in the map.", end - start,
map.size)
+    assertEquals("Map should contain first value", 0L, map.get(key(0)))
+    assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
+  }
+
+  private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]):
Iterable[Long] = {
+    for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
+      yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset
+  }
+
+  private def messageWithOffset(key: Int, value: Int, offset: Long) =
+    new ByteBufferMessageSet(NoCompressionCodec, Seq(offset),
+                             new Message(key = key.toString.getBytes,
+                                         bytes = value.toString.getBytes,
+                                         timestamp = Message.NoTimestamp,
+                                         magicValue = Message.MagicValue_V1))
   
   
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =


Mime
View raw message