kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Tighten FileRecords size checks to prevent overflow (#5332)
Date Wed, 11 Jul 2018 18:45:15 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8119683  MINOR: Tighten FileRecords size checks to prevent overflow (#5332)
8119683 is described below

commit 8119683a234543aa7b523a571b8a68a5002c5b56
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Jul 11 11:44:54 2018 -0700

    MINOR: Tighten FileRecords size checks to prevent overflow (#5332)
    
    Add some additional size validation to prevent overflows when using `FileRecords`.
    
    Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Ismael Juma <ismael@juma.me.uk>
---
 .../apache/kafka/common/record/FileRecords.java    | 28 +++++++++++++------
 .../kafka/common/record/FileRecordsTest.java       | 32 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 0aa9f46..cebb5fa 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -55,11 +55,11 @@ public class FileRecords extends AbstractRecords implements Closeable
{
      * The {@code FileRecords.open} methods should be used instead of this constructor whenever
possible.
      * The constructor is visible for tests.
      */
-    public FileRecords(File file,
-                       FileChannel channel,
-                       int start,
-                       int end,
-                       boolean isSlice) throws IOException {
+    FileRecords(File file,
+                FileChannel channel,
+                int start,
+                int end,
+                boolean isSlice) throws IOException {
         this.file = file;
         this.channel = channel;
         this.start = start;
@@ -71,6 +71,10 @@ public class FileRecords extends AbstractRecords implements Closeable {
             // don't check the file size if this is just a slice view
             size.set(end - start);
         } else {
+            if (channel.size() > Integer.MAX_VALUE)
+                throw new KafkaException("The size of segment " + file + " (" + channel.size()
+
+                        ") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE);
+
             int limit = Math.min((int) channel.size(), end);
             size.set(limit - start);
 
@@ -131,9 +135,11 @@ public class FileRecords extends AbstractRecords implements Closeable
{
      */
     public FileRecords slice(int position, int size) throws IOException {
         if (position < 0)
-            throw new IllegalArgumentException("Invalid position: " + position + " in read
from " + file);
+            throw new IllegalArgumentException("Invalid position: " + position + " in read
from " + this);
+        if (position > sizeInBytes() - start)
+            throw new IllegalArgumentException("Slice from position " + position + " exceeds
end position of " + this);
         if (size < 0)
-            throw new IllegalArgumentException("Invalid size: " + size + " in read from "
+ file);
+            throw new IllegalArgumentException("Invalid size: " + size + " in read from "
+ this);
 
         int end = this.start + position + size;
         // handle integer overflow or if end is beyond the end of the file
@@ -143,11 +149,17 @@ public class FileRecords extends AbstractRecords implements Closeable
{
     }
 
     /**
-     * Append log batches to the buffer
+     * Append a set of records to the file. This method is not thread-safe and must be
+     * protected with a lock.
+     *
      * @param records The records to append
      * @return the number of bytes written to the underlying file
      */
     public int append(MemoryRecords records) throws IOException {
+        if (records.sizeInBytes() > Integer.MAX_VALUE - size.get())
+            throw new IllegalArgumentException("Append of size " + records.sizeInBytes()
+
+                    " bytes is too large for segment with current file position at " + size.get());
+
         int written = records.writeFullyTo(channel);
         size.getAndAdd(written);
         return written;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index f08652e..77de33f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -45,7 +46,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class FileRecordsTest {
+public class FileRecordsTest extends EasyMockSupport {
 
     private byte[][] values = new byte[][] {
             "abcd".getBytes(),
@@ -61,6 +62,35 @@ public class FileRecordsTest {
         this.time = new MockTime();
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testAppendProtectsFromOverflow() throws Exception {
+        File fileMock = mock(File.class);
+        FileChannel fileChannelMock = mock(FileChannel.class);
+        EasyMock.expect(fileChannelMock.size()).andStubReturn((long) Integer.MAX_VALUE);
+        EasyMock.expect(fileChannelMock.position(Integer.MAX_VALUE)).andReturn(fileChannelMock);
+
+        replayAll();
+
+        FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE,
false);
+        append(records, values);
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testOpenOversizeFile() throws Exception {
+        File fileMock = mock(File.class);
+        FileChannel fileChannelMock = mock(FileChannel.class);
+        EasyMock.expect(fileChannelMock.size()).andStubReturn(Integer.MAX_VALUE + 5L);
+
+        replayAll();
+
+        new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOutOfRangeSlice() throws Exception {
+        this.fileRecords.slice(fileRecords.sizeInBytes() + 1, 15).sizeInBytes();
+    }
+
     /**
      * Test that the cached size variable matches the actual file size as we append messages
      */


Mime
View raw message