kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized (#7071)
Date Fri, 12 Jul 2019 17:28:38 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 8c93b7e  KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently
sized (#7071)
8c93b7e is described below

commit 8c93b7ecd651c17e61d89fdb533b8ec4d1b0bfd7
Author: Dhruvil Shah <dhruvil@confluent.io>
AuthorDate: Fri Jul 12 10:28:25 2019 -0700

    KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized
(#7071)
    
    Backport https://github.com/apache/kafka/pull/6974 to 1.1
    
    When the log contains out of order message formats (for example v2 message followed by
v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible
for down-conversion to fail. With compressed batches, we estimate the size of down-converted
batches using:
    
    ```
        private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType)
{
            return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size
/ 2, 1024), 1 << 16);
        }
    ```
    
    This almost always underestimates size of down-converted records if the batch is between
1kB-64kB in size. In general, this means we may under estimate the total size required for
compressed batches.
    
    Because of an implicit assumption in the code that messages with a lower message format
appear before any with a higher message format, we do not grow the buffer we copy the down
converted records into when we see a message <= the target message format. This assumption
becomes incorrect when the log contains out of order message formats, for example because
of leaders flapping while upgrading the message format.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/common/record/AbstractRecords.java       |  1 +
 .../kafka/common/record/FileRecordsTest.java       | 36 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 89a5413..0552e6b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -104,6 +104,7 @@ public abstract class AbstractRecords implements Records {
         for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
             temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
             if (recordBatchAndRecords.batch.magic() <= toMagic) {
+                buffer = Utils.ensureCapacity(buffer, buffer.position() + recordBatchAndRecords.batch.sizeInBytes());
                 recordBatchAndRecords.batch.writeTo(buffer);
             } else {
                 MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index fdd3ede..d1bf3d3 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.test.TestUtils.tempFile;
@@ -359,6 +360,41 @@ public class FileRecordsTest {
         doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V2);
     }
 
+    @Test
+    public void testDownconversionAfterMessageFormatDowngrade() throws IOException {
+        // random bytes
+        Random random = new Random();
+        byte[] bytes = new byte[3000];
+        random.nextBytes(bytes);
+
+        // records
+        CompressionType compressionType = CompressionType.GZIP;
+        List<Long> offsets = asList(0L, 1L);
+        List<Byte> magic = asList(RecordBatch.MAGIC_VALUE_V2, RecordBatch.MAGIC_VALUE_V1);
 // downgrade message format from v2 to v1
+        List<SimpleRecord> records = asList(
+                new SimpleRecord(1L, "k1".getBytes(), bytes),
+                new SimpleRecord(2L, "k2".getBytes(), bytes));
+        byte toMagic = 1;
+
+        // create MemoryRecords
+        ByteBuffer buffer = ByteBuffer.allocate(8000);
+        for (int i = 0; i < records.size(); i++) {
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic.get(i), compressionType,
TimestampType.CREATE_TIME, 0L);
+            builder.appendWithOffset(offsets.get(i), records.get(i));
+            builder.close();
+        }
+        buffer.flip();
+
+        // create FileRecords, down-convert and verify
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            fileRecords.append(MemoryRecords.readableRecords(buffer));
+            fileRecords.flush();
+
+            Records convertedRecords = fileRecords.downConvert(toMagic, 0, time).records();
+            verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
+        }
+    }
+
     private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException
{
         List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
 


Mime
View raw message