kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6834: Handle compaction with batches bigger than max.message.bytes (#4953)
Date Wed, 09 May 2018 10:46:44 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ecb72f  KAFKA-6834: Handle compaction with batches bigger than max.message.bytes
(#4953)
0ecb72f is described below

commit 0ecb72f59da6491edc96b99b147b4983be794acf
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Wed May 9 11:46:36 2018 +0100

    KAFKA-6834: Handle compaction with batches bigger than max.message.bytes (#4953)
    
    Grow buffers in log cleaner to hold one message set after sanity check even if message
set is bigger than max.message.bytes.
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>,
Jun Rao <junrao@gmail.com>
---
 .../common/record/ByteBufferLogInputStream.java    | 41 ++++++++----
 .../apache/kafka/common/record/MemoryRecords.java  | 13 ++++
 .../kafka/common/record/MemoryRecordsTest.java     | 42 ++++++++++++
 core/src/main/scala/kafka/log/LogCleaner.scala     | 41 ++++++++++--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 75 +++++++++++++++++++++-
 5 files changed, 194 insertions(+), 18 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index 22f417f..7f91f26 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
 import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
@@ -40,9 +41,33 @@ class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch>
{
 
     public MutableRecordBatch nextBatch() throws IOException {
         int remaining = buffer.remaining();
-        if (remaining < LOG_OVERHEAD)
+
+        Integer batchSize = nextBatchSize();
+        if (batchSize == null || remaining < batchSize)
             return null;
 
+        byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
+
+        ByteBuffer batchSlice = buffer.slice();
+        batchSlice.limit(batchSize);
+        buffer.position(buffer.position() + batchSize);
+
+        if (magic > RecordBatch.MAGIC_VALUE_V1)
+            return new DefaultRecordBatch(batchSlice);
+        else
+            return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
+    }
+
+    /**
+     * Validates the header of the next batch and returns batch size.
+     * @return next batch size including LOG_OVERHEAD if buffer contains header up to
+     *         magic byte, null otherwise
+     * @throws CorruptRecordException if record size or magic is invalid
+     */
+    Integer nextBatchSize() throws CorruptRecordException {
+        int remaining = buffer.remaining();
+        if (remaining < LOG_OVERHEAD)
+            return null;
         int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
         // V0 has the smallest overhead, stricter checking is done later
         if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
@@ -52,23 +77,13 @@ class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch>
{
             throw new CorruptRecordException(String.format("Record size %d exceeds the largest
allowable message size (%d).",
                     recordSize, maxMessageSize));
 
-        int batchSize = recordSize + LOG_OVERHEAD;
-        if (remaining < batchSize)
+        if (remaining < HEADER_SIZE_UP_TO_MAGIC)
             return null;
 
         byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
-
-        ByteBuffer batchSlice = buffer.slice();
-        batchSlice.limit(batchSize);
-        buffer.position(buffer.position() + batchSize);
-
         if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
             throw new CorruptRecordException("Invalid magic found in record: " + magic);
 
-        if (magic > RecordBatch.MAGIC_VALUE_V1)
-            return new DefaultRecordBatch(batchSlice);
-        else
-            return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
+        return recordSize + LOG_OVERHEAD;
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index ea6aa4c..eb4e31b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.CloseableIterator;
@@ -118,6 +119,18 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     /**
+     * Validates the header of the first batch and returns batch size.
+     * @return first batch size including LOG_OVERHEAD if buffer contains header up to
+     *         magic byte, null otherwise
+     * @throws CorruptRecordException if record size or magic is invalid
+     */
+    public Integer firstBatchSize() {
+        if (buffer.remaining() < HEADER_SIZE_UP_TO_MAGIC)
+            return null;
+        return new ByteBufferLogInputStream(buffer, Integer.MAX_VALUE).nextBatchSize();
+    }
+
+    /**
      * Filter the records into the provided ByteBuffer.
      *
      * @param partition                   The partition that is filtered (used only for logging)
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index e1409e0..61d8a00 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.Utils;
@@ -794,6 +795,47 @@ public class MemoryRecordsTest {
         }
     }
 
+    @Test
+    public void testNextBatchSize() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
+                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch, firstSequence);
+        builder.append(10L, null, "abc".getBytes());
+        builder.close();
+
+        buffer.flip();
+        int size = buffer.remaining();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assertEquals(size, records.firstBatchSize().intValue());
+        assertEquals(0, buffer.position());
+
+        buffer.limit(1); // size not in buffer
+        assertEquals(null, records.firstBatchSize());
+        buffer.limit(Records.LOG_OVERHEAD); // magic not in buffer
+        assertEquals(null, records.firstBatchSize());
+        buffer.limit(Records.HEADER_SIZE_UP_TO_MAGIC); // payload not in buffer
+        assertEquals(size, records.firstBatchSize().intValue());
+
+        buffer.limit(size);
+        byte magic = buffer.get(Records.MAGIC_OFFSET);
+        buffer.put(Records.MAGIC_OFFSET, (byte) 10);
+        try {
+            records.firstBatchSize();
+            fail("Did not fail with corrupt magic");
+        } catch (CorruptRecordException e) {
+            // Expected exception
+        }
+        buffer.put(Records.MAGIC_OFFSET, magic);
+
+        buffer.put(Records.SIZE_OFFSET + 3, (byte) 0);
+        try {
+            records.firstBatchSize();
+            fail("Did not fail with corrupt size");
+        } catch (CorruptRecordException e) {
+            // Expected exception
+        }
+    }
+
     @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, compressionType={2}")
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<>();
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index ee31274..aa7cfe2 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 
@@ -621,13 +621,46 @@ private[log] class Cleaner(val id: Int,
       }
 
       // if we read bytes but didn't get even one complete batch, our I/O buffer is too small,
grow it and try again
-      // `result.bytesRead` contains bytes from the `messagesRead` and any discarded batches.
+      // `result.bytesRead` contains bytes from `messagesRead` and any discarded batches.
       if (readBuffer.limit() > 0 && result.bytesRead == 0)
-        growBuffers(maxLogMessageSize)
+        growBuffersOrFail(sourceRecords, position, maxLogMessageSize, records)
     }
     restoreBuffers()
   }
 
+
+  /**
+   * Grow buffers to process next batch of records from `sourceRecords.` Buffers are doubled
in size
+   * up to a maximum of `maxLogMessageSize`. In some scenarios, a record could be bigger
than the
+   * current maximum size configured for the log. For example:
+   *   1. A compacted topic using compression may contain a message set slightly larger than
max.message.bytes
+   *   2. max.message.bytes of a topic could have been reduced after writing larger messages
+   * In these cases, grow the buffer to hold the next batch.
+   */
+  private def growBuffersOrFail(sourceRecords: FileRecords,
+                                position: Int,
+                                maxLogMessageSize: Int,
+                                memoryRecords: MemoryRecords): Unit = {
+
+    val maxSize = if (readBuffer.capacity >= maxLogMessageSize) {
+      val nextBatchSize = memoryRecords.firstBatchSize
+      val logDesc = s"log segment ${sourceRecords.file} at position $position"
+      if (nextBatchSize == null)
+        throw new IllegalStateException(s"Could not determine next batch size for $logDesc")
+      if (nextBatchSize <= 0)
+        throw new IllegalStateException(s"Invalid batch size $nextBatchSize for $logDesc")
+      if (nextBatchSize <= readBuffer.capacity)
+        throw new IllegalStateException(s"Batch size $nextBatchSize < buffer size ${readBuffer.capacity},
but not processed for $logDesc")
+      val bytesLeft = sourceRecords.channel.size - position
+      if (nextBatchSize > bytesLeft)
+        throw new CorruptRecordException(s"Log segment may be corrupt, batch size $nextBatchSize
> $bytesLeft bytes left in segment for $logDesc")
+      nextBatchSize.intValue
+    } else
+      maxLogMessageSize
+
+    growBuffers(maxSize)
+  }
+
   private def shouldDiscardBatch(batch: RecordBatch,
                                  transactionMetadata: CleanedTransactionMetadata,
                                  retainTxnMarkers: Boolean): Boolean = {
@@ -844,7 +877,7 @@ private[log] class Cleaner(val id: Int,
 
       // if we didn't read even one complete message, our read buffer may be too small
       if(position == startPosition)
-        growBuffers(maxLogMessageSize)
+        growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
     }
     restoreBuffers()
     false
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index edc1744..537c561 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.File
+import java.io.{File, RandomAccessFile}
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
@@ -26,6 +26,7 @@ import kafka.common._
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
@@ -500,6 +501,78 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(shouldRemain, keysInLog(log))
   }
 
+  /**
+   * Test log cleaning with logs containing messages larger than topic's max message size
+   */
+  @Test
+  def testMessageLargerThanMaxMessageSize() {
+    val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024
* 1024)
+
+    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+    val shouldRemain = keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
+    assertEquals(shouldRemain, keysInLog(log))
+  }
+
+  /**
+   * Test log cleaning with logs containing messages larger than topic's max message size
+   * where header is corrupt
+   */
+  @Test
+  def testMessageLargerThanMaxMessageSizeWithCorruptHeader() {
+    val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024
* 1024)
+    val file = new RandomAccessFile(log.logSegments.head.log.file, "rw")
+    file.seek(Records.MAGIC_OFFSET)
+    file.write(0xff)
+    file.close()
+
+    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    intercept[CorruptRecordException] {
+      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+    }
+  }
+
+  /**
+   * Test log cleaning with logs containing messages larger than topic's max message size
+   * where message size is corrupt and larger than bytes available in log segment.
+   */
+  @Test
+  def testCorruptMessageSizeLargerThanBytesAvailable() {
+    val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024
* 1024)
+    val file = new RandomAccessFile(log.logSegments.head.log.file, "rw")
+    file.setLength(1024)
+    file.close()
+
+    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    intercept[CorruptRecordException] {
+      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+    }
+  }
+
+  def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (Log, FakeOffsetMap)
= {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16: java.lang.Integer)
+    logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    while(log.numberOfSegments < 2)
+      log.appendAsLeader(record(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)),
leaderEpoch = 0)
+    val keysFound = keysInLog(log)
+    assertEquals(0L until log.logEndOffset, keysFound)
+
+    // Decrease the log's max message size
+    logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize / 2: java.lang.Integer)
+    log.config = LogConfig.fromProps(logConfig.originals, logProps)
+
+    // pretend we have the following keys
+    val keys = immutable.ListSet(1, 3, 5, 7, 9)
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keys.foreach(k => map.put(key(k), Long.MaxValue))
+
+    (log, map)
+  }
+
   @Test
   def testCleaningWithDeletes(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message