kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6292; Improve FileLogInputStream batch position checks to avoid type overflow (#4928)
Date Wed, 09 May 2018 00:07:59 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fcb15e3  KAFKA-6292; Improve FileLogInputStream batch position checks to avoid type
overflow (#4928)
fcb15e3 is described below

commit fcb15e357c1b818d2d543dc9db3e011ddc1fbf5e
Author: Roman Khlebnov <suppie.rk@gmail.com>
AuthorDate: Wed May 9 03:07:50 2018 +0300

    KAFKA-6292; Improve FileLogInputStream batch position checks to avoid type overflow (#4928)
    
    Switch from sum operations to subtraction to avoid type casting in checks and type overflow
during `FlieLogInputStream` work, especially in cases where property `log.segment.bytes` was
set close to the `Integer.MAX_VALUE` and used as a `position` inside `nextBatch()` function.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/common/record/FileLogInputStream.java      |  4 ++--
 .../kafka/common/record/FileLogInputStreamTest.java  | 20 ++++++++++++++++++--
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index a1e3a2f..92e8864 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -60,7 +60,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
     @Override
     public FileChannelRecordBatch nextBatch() throws IOException {
         FileChannel channel = fileRecords.channel();
-        if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
+        if (position >= end - HEADER_SIZE_UP_TO_MAGIC)
             return null;
 
         logHeaderBuffer.rewind();
@@ -75,7 +75,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
             throw new CorruptRecordException(String.format("Found record size %d smaller
than minimum record " +
                             "overhead (%d) in file %s.", size, LegacyRecord.RECORD_OVERHEAD_V0,
fileRecords.file()));
 
-        if (position + LOG_OVERHEAD + size > end)
+        if (position > end - LOG_OVERHEAD - size)
             return null;
 
         byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index 95b2a0c..77aaae8 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -112,8 +112,8 @@ public class FileLogInputStreamTest {
             SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
                 new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
                 new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
-
             };
+
             SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
                 new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
                 new SimpleRecord(897839L, null, "4".getBytes()),
@@ -152,8 +152,8 @@ public class FileLogInputStreamTest {
             SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
                 new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
                 new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
-
             };
+
             SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
                 new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
                 new SimpleRecord(897839L, null, "4".getBytes()),
@@ -204,6 +204,22 @@ public class FileLogInputStreamTest {
         }
     }
 
+    @Test
+    public void testNextBatchSelectionWithMaxedParams() throws IOException {
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    @Test
+    public void testNextBatchSelectionWithZeroedParams() throws IOException {
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, 0);
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
     private void assertProducerData(RecordBatch batch, long producerId, short producerEpoch,
int baseSequence,
                                     boolean isTransactional, SimpleRecord ... records) {
         assertEquals(producerId, batch.producerId());

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message