kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4019; Update log cleaner to handle max message size of topics
Date Tue, 04 Oct 2016 03:23:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 0397c7777 -> 3d7e3d685


KAFKA-4019; Update log cleaner to handle max message size of topics

Grow read and write buffers of cleaner up to the maximum message size of the log being cleaned
if the topic has larger max message size than the default config of the broker.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>,
Jason Gustafson <jason@confluent.io>

Closes #1758 from rajinisivaram/KAFKA-4019

(cherry picked from commit 672dfaa243850a835c1b01da4fb20311ba6455bc)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d7e3d68
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d7e3d68
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d7e3d68

Branch: refs/heads/0.10.1
Commit: 3d7e3d68570f8a1ec759f1bd671e9271c3dd99cc
Parents: 0397c77
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Mon Oct 3 20:02:47 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Oct 3 20:24:00 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 26 +++++++-----
 .../test/scala/unit/kafka/log/CleanerTest.scala | 43 +++++++++++++++++---
 2 files changed, 53 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3d7e3d68/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 820d123..3d9a20d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -401,7 +401,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.largestTimestamp > deleteHorizonMs
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
             .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset,
if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion)
+        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion,
log.config.maxMessageSize)
       }
 
       // trim excess index
@@ -434,18 +434,21 @@ private[log] class Cleaner(val id: Int,
    * Clean the given source log segment into the destination segment using the key=>offset
mapping
    * provided
    *
+   * @param topicAndPartition The topic and partition of the log segment to clean
    * @param source The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
    * @param retainDeletes Should delete tombstones be retained while cleaning this segment
-   * @param messageFormatVersion the message format version to use after compaction
+   * @param messageFormatVersion The message format version to use after compaction
+   * @param maxLogMessageSize The maximum message size of the corresponding topic
    */
   private[log] def cleanInto(topicAndPartition: TopicAndPartition,
                              source: LogSegment,
                              dest: LogSegment,
                              map: OffsetMap,
                              retainDeletes: Boolean,
-                             messageFormatVersion: Byte) {
+                             messageFormatVersion: Byte,
+                             maxLogMessageSize: Int) {
     var position = 0
     while (position < source.log.sizeInBytes) {
       checkDone(topicAndPartition)
@@ -508,7 +511,7 @@ private[log] class Cleaner(val id: Int,
       
       // if we read bytes but didn't get even one complete message, our I/O buffer is too
small, grow it and try again
       if (readBuffer.limit > 0 && messagesRead == 0)
-        growBuffers()
+        growBuffers(maxLogMessageSize)
     }
     restoreBuffers()
   }
@@ -577,10 +580,11 @@ private[log] class Cleaner(val id: Int,
   /**
    * Double the I/O buffer capacity
    */
-  def growBuffers() {
-    if(readBuffer.capacity >= maxIoBufferSize || writeBuffer.capacity >= maxIoBufferSize)
-      throw new IllegalStateException("This log contains a message larger than maximum allowable
size of %s.".format(maxIoBufferSize))
-    val newSize = math.min(this.readBuffer.capacity * 2, maxIoBufferSize)
+  def growBuffers(maxLogMessageSize: Int) {
+    val maxBufferSize = math.max(maxLogMessageSize, maxIoBufferSize)
+    if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize)
+      throw new IllegalStateException("This log contains a message larger than maximum allowable
size of %s.".format(maxBufferSize))
+    val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize)
     info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize
+ " bytes.")
     this.readBuffer = ByteBuffer.allocate(newSize)
     this.writeBuffer = ByteBuffer.allocate(newSize)
@@ -650,7 +654,7 @@ private[log] class Cleaner(val id: Int,
     for (segment <- dirty if !full) {
       checkDone(log.topicAndPartition)
 
-      full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, start)
+      full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, start, log.config.maxMessageSize)
       if (full)
         debug("Offset map is full, %d segments fully mapped, segment with base offset %d
is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
     }
@@ -665,7 +669,7 @@ private[log] class Cleaner(val id: Int,
    *
    * @return If the map was filled whilst loading from this segment
    */
-  private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment,
map: OffsetMap, start: Long): Boolean = {
+  private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment,
map: OffsetMap, start: Long, maxLogMessageSize: Int): Boolean = {
     var position = segment.index.lookup(start).position
     val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
@@ -689,7 +693,7 @@ private[log] class Cleaner(val id: Int,
 
       // if we didn't read even one complete message, our read buffer may be too small
       if(position == startPosition)
-        growBuffers()
+        growBuffers(maxLogMessageSize)
     }
     restoreBuffers()
     return false

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d7e3d68/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index c7c3dab..f4458a0 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -80,6 +80,36 @@ class CleanerTest extends JUnitSuite {
     assertEquals(shouldRemain, keysInLog(log))
   }
 
+  /**
+   * Test log cleaning with logs containing messages larger than default message size
+   */
+  @Test
+  def testLargeMessage() {
+    val largeMessageSize = 1024 * 1024
+    // Create cleaner with very small default max message size
+    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16: java.lang.Integer)
+    logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    while(log.numberOfSegments < 2)
+      log.append(message(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)))
+    val keysFound = keysInLog(log)
+    assertEquals(0L until log.logEndOffset, keysFound)
+
+    // pretend we have the following keys
+    val keys = immutable.ListSet(1, 3, 5, 7, 9)
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keys.foreach(k => map.put(key(k), Long.MaxValue))
+
+    // clean the log
+    cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L)
+    val shouldRemain = keysInLog(log).filter(!keys.contains(_))
+    assertEquals(shouldRemain, keysInLog(log))
+  }
+
   @Test
   def testCleaningWithDeletes(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
@@ -598,11 +628,11 @@ class CleanerTest extends JUnitSuite {
 
   def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */  }
 
-  def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone)
=
+  def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone,
maxMessageSize: Int = 64*1024) =
     new Cleaner(id = 0, 
                 offsetMap = new FakeOffsetMap(capacity), 
-                ioBufferSize = 64*1024, 
-                maxIoBufferSize = 64*1024,
+                ioBufferSize = maxMessageSize,
+                maxIoBufferSize = maxMessageSize,
                 dupBufferLoadFactor = 0.75,
                 throttler = throttler, 
                 time = time,
@@ -615,9 +645,12 @@ class CleanerTest extends JUnitSuite {
   
   def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
   
-  def message(key: Int, value: Int) = 
+  def message(key: Int, value: Int): ByteBufferMessageSet =
+    message(key, value.toString.getBytes)
+
+  def message(key: Int, value: Array[Byte]) =
     new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
-                                         bytes = value.toString.getBytes,
+                                         bytes = value,
                                          timestamp = Message.NoTimestamp,
                                          magicValue = Message.MagicValue_V1))
 


Mime
View raw message