kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix chunked down-conversion behavior when no valid batch exists after conversion (#5173)
Date Fri, 15 Jun 2018 06:00:48 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a8c17e3  MINOR: Fix chunked down-conversion behavior when no valid batch exists after
conversion (#5173)
a8c17e3 is described below

commit a8c17e36c3239f33925b061d99a5d1d1074bbc67
Author: Dhruvil Shah <dhruvil@confluent.io>
AuthorDate: Thu Jun 14 23:00:33 2018 -0700

    MINOR: Fix chunked down-conversion behavior when no valid batch exists after conversion
(#5173)
    
    We might decide to drop certain message batches during down-conversion because older clients
might not be able to interpret them. One such example is control batches which are typically
removed by the broker if down-conversion to V0 or V1 is required. This patch makes sure the
chunked down-conversion implementation is able to handle such cases.
---
 .../common/record/LazyDownConversionRecords.java   |  44 +++--
 .../record/LazyDownConversionRecordsSend.java      |  32 ++-
 .../java/org/apache/kafka/common/utils/Utils.java  |  11 ++
 .../kafka/common/record/FileRecordsTest.java       |  23 +--
 .../record/LazyDownConversionRecordsTest.java      | 214 +++++++++++----------
 .../common/record/MemoryRecordsBuilderTest.java    |   5 +-
 docs/upgrade.html                                  |   8 +
 7 files changed, 174 insertions(+), 163 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
index da14b5b..d58689d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Time;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -57,13 +56,15 @@ public class LazyDownConversionRecords implements BaseRecords {
         // need to make sure that we are able to accommodate one full batch of down-converted
messages. The way we achieve
         // this is by having sizeInBytes method factor in the size of the first down-converted
batch and return at least
         // its size.
-        AbstractIterator<? extends RecordBatch> it = records.batchIterator();
+        java.util.Iterator<ConvertedRecords> it = iterator(0);
         if (it.hasNext()) {
-            firstConvertedBatch = RecordsUtil.downConvert(Collections.singletonList(it.peek()),
toMagic, firstOffset, time);
+            firstConvertedBatch = it.next();
             sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes());
         } else {
+            // If there are no messages we got after down-conversion, make sure we are able
to send at least an overflow
+            // message to the consumer. Typically, the consumer would need to increase the
fetch size in such cases.
             firstConvertedBatch = null;
-            sizeInBytes = 0;
+            sizeInBytes = LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH;
         }
     }
 
@@ -148,21 +149,28 @@ public class LazyDownConversionRecords implements BaseRecords {
                 return convertedBatch;
             }
 
-            if (!batchIterator.hasNext())
-                return allDone();
-
-            // Figure out batches we should down-convert based on the size constraints
-            List<RecordBatch> batches = new ArrayList<>();
-            boolean isFirstBatch = true;
-            long sizeSoFar = 0;
-            while (batchIterator.hasNext() &&
-                    (isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar) <=
maximumReadSize)) {
-                RecordBatch currentBatch = batchIterator.next();
-                batches.add(currentBatch);
-                sizeSoFar += currentBatch.sizeInBytes();
-                isFirstBatch = false;
+            while (batchIterator.hasNext()) {
+                List<RecordBatch> batches = new ArrayList<>();
+                boolean isFirstBatch = true;
+                long sizeSoFar = 0;
+
+                // Figure out batches we should down-convert based on the size constraints
+                while (batchIterator.hasNext() &&
+                        (isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar)
<= maximumReadSize)) {
+                    RecordBatch currentBatch = batchIterator.next();
+                    batches.add(currentBatch);
+                    sizeSoFar += currentBatch.sizeInBytes();
+                    isFirstBatch = false;
+                }
+                ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic,
firstOffset, time);
+                // During conversion, it is possible that we drop certain batches because
they do not have an equivalent
+                // representation in the message format we want to convert to. For example,
V0 and V1 message formats
+                // have no notion of transaction markers which were introduced in V2 so they
get dropped during conversion.
+                // We return converted records only when we have at least one valid batch
of messages after conversion.
+                if (convertedRecords.records().sizeInBytes() > 0)
+                    return convertedRecords;
             }
-            return RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
+            return allDone();
         }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
index e60e1ed..f0fab7d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
@@ -33,6 +32,7 @@ import java.util.Iterator;
 public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownConversionRecords>
{
     private static final Logger log = LoggerFactory.getLogger(LazyDownConversionRecordsSend.class);
     private static final int MAX_READ_SIZE = 128 * 1024;
+    static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD;
 
     private RecordConversionStats recordConversionStats;
     private RecordsSend convertedRecordsWriter;
@@ -49,39 +49,31 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
     public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining)
throws IOException {
         if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) {
             MemoryRecords convertedRecords;
-
             // Check if we have more chunks left to down-convert
             if (convertedRecordsIterator.hasNext()) {
                 // Get next chunk of down-converted messages
                 ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
                 convertedRecords = recordsAndStats.records();
-
-                int sizeOfFirstConvertedBatch = convertedRecords.batchIterator().next().sizeInBytes();
-                if (previouslyWritten == 0 && sizeOfFirstConvertedBatch > size())
-                    throw new EOFException("Unable to send first batch completely." +
-                            " maximum_size: " + size() +
-                            " converted_records_size: " + sizeOfFirstConvertedBatch);
-
                 recordConversionStats.add(recordsAndStats.recordConversionStats());
-                log.debug("Got lazy converted records for partition {} with length={}", topicPartition(),
convertedRecords.sizeInBytes());
+                log.debug("Down-converted records for partition {} with length={}", topicPartition(),
convertedRecords.sizeInBytes());
             } else {
-                if (previouslyWritten == 0)
-                    throw new EOFException("Unable to get the first batch of down-converted
records");
-
-                // We do not have any records left to down-convert. Construct a "fake" message
for the length remaining.
+                // We do not have any records left to down-convert. Construct an overflow
message for the length remaining.
                 // This message will be ignored by the consumer because its length will be
past the length of maximum
                 // possible response size.
                 // DefaultRecordBatch =>
                 //      BaseOffset => Int64
                 //      Length => Int32
                 //      ...
-                log.debug("Constructing fake message batch for partition {} for remaining
length={}", topicPartition(), remaining);
-                ByteBuffer fakeMessageBatch = ByteBuffer.allocate(Math.max(Records.LOG_OVERHEAD,
Math.min(remaining + 1, MAX_READ_SIZE)));
-                fakeMessageBatch.putLong(-1L);
-                fakeMessageBatch.putInt(remaining + 1);
-                convertedRecords = MemoryRecords.readableRecords(fakeMessageBatch);
-            }
+                ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
+                        Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE)));
+                overflowMessageBatch.putLong(-1L);
 
+                // Fill in the length of the overflow batch. A valid batch must be at least
as long as the minimum batch
+                // overhead.
+                overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
+                convertedRecords = MemoryRecords.readableRecords(overflowMessageBatch);
+                log.debug("Constructed overflow message batch for partition {} with length={}",
topicPartition(), remaining);
+            }
             convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords,
Math.min(convertedRecords.sizeInBytes(), remaining));
         }
         return convertedRecordsWriter.writeTo(channel);
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index ebe87ba..31fa01c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -112,6 +112,17 @@ public final class Utils {
     }
 
     /**
+     * Read a UTF8 string from the current position till the end of a byte buffer. The position
of the byte buffer is
+     * not affected by this method.
+     *
+     * @param buffer The buffer to read from
+     * @return The UTF8 string
+     */
+    public static String utf8(ByteBuffer buffer) {
+        return utf8(buffer, buffer.remaining());
+    }
+
+    /**
      * Read a UTF8 string from a byte buffer at a given offset. Note that the position of
the byte buffer
      * is not affected by this method.
      *
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index bbe84b2..f08652e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -38,6 +37,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -430,10 +430,6 @@ public class FileRecordsTest {
         }
     }
 
-    private String utf8(ByteBuffer buffer) {
-        return Utils.utf8(buffer, buffer.remaining());
-    }
-
     private void downConvertAndVerifyRecords(List<SimpleRecord> initialRecords,
                                              List<Long> initialOffsets,
                                              FileRecords fileRecords,
@@ -441,13 +437,11 @@ public class FileRecordsTest {
                                              byte toMagic,
                                              long firstOffset,
                                              Time time) {
-        long numBatches = 0;
         long minBatchSize = Long.MAX_VALUE;
         long maxBatchSize = Long.MIN_VALUE;
         for (RecordBatch batch : fileRecords.batches()) {
             minBatchSize = Math.min(minBatchSize, batch.sizeInBytes());
             maxBatchSize = Math.max(maxBatchSize, batch.sizeInBytes());
-            numBatches++;
         }
 
         // Test the normal down-conversion path
@@ -469,21 +463,6 @@ public class FileRecordsTest {
             Iterator<ConvertedRecords> it = lazyRecords.iterator(readSize);
             while (it.hasNext())
                 convertedRecords.add(it.next().records());
-
-            // Check if chunking works as expected. The only way to predictably test for
this is by testing the edge cases.
-            // 1. If maximum read size is greater than the size of all batches combined,
we must get all down-conversion
-            //    records in exactly two batches; the first chunk is pre down-converted and
returned, and the second chunk
-            //    contains the remaining batches.
-            // 2. If maximum read size is just smaller than the size of all batches combined,
we must get results in two
-            //    chunks.
-            // 3. If maximum read size is less than the size of a single record, we get one
batch in each chunk.
-            if (readSize >= fileRecords.sizeInBytes())
-                assertEquals(2, convertedRecords.size());
-            else if (readSize == fileRecords.sizeInBytes() - 1)
-                assertEquals(2, convertedRecords.size());
-            else if (readSize <= minBatchSize)
-                assertEquals(numBatches, convertedRecords.size());
-
             verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compressionType,
toMagic);
             convertedRecords.clear();
         }
diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
index 8765603..89c1aea 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -35,86 +34,126 @@ import java.util.Collection;
 import java.util.List;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@RunWith(value = Parameterized.class)
 public class LazyDownConversionRecordsTest {
-    private final CompressionType compressionType;
-    private final byte toMagic;
-    private final DownConversionTest test;
-
-    public LazyDownConversionRecordsTest(CompressionType compressionType, byte toMagic, DownConversionTest
test) {
-        this.compressionType = compressionType;
-        this.toMagic = toMagic;
-        this.test = test;
+    /**
+     * Test the lazy down-conversion path in the presence of commit markers. When converting
to V0 or V1, these batches
+     * are dropped. If there happen to be no more batches left to convert, we must get an
overflow message batch after
+     * conversion.
+     */
+    @Test
+    public void testConversionOfCommitMarker() throws IOException {
+        MemoryRecords recordsToConvert = MemoryRecords.withEndTransactionMarker(0, Time.SYSTEM.milliseconds(),
RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                1, (short) 1, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
+        MemoryRecords convertedRecords = convertRecords(recordsToConvert, (byte) 1, recordsToConvert.sizeInBytes());
+        ByteBuffer buffer = convertedRecords.buffer();
+
+        // read the offset and the batch length
+        buffer.getLong();
+        int sizeOfConvertedRecords = buffer.getInt();
+
+        // assert we got an overflow message batch
+        assertTrue(sizeOfConvertedRecords > buffer.limit());
+        assertFalse(convertedRecords.batchIterator().hasNext());
     }
 
-    enum DownConversionTest {
-        DEFAULT,
-        OVERFLOW,
-    }
+    @RunWith(value = Parameterized.class)
+    public static class ParameterizedConversionTest {
+        private final CompressionType compressionType;
+        private final byte toMagic;
 
-    @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}, test={2}")
-    public static Collection<Object[]> data() {
-        List<Object[]> values = new ArrayList<>();
-        for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE;
toMagic++) {
-            for (DownConversionTest test : DownConversionTest.values()) {
-                values.add(new Object[]{CompressionType.NONE, toMagic, test});
-                values.add(new Object[]{CompressionType.GZIP, toMagic, test});
+        public ParameterizedConversionTest(CompressionType compressionType, byte toMagic)
{
+            this.compressionType = compressionType;
+            this.toMagic = toMagic;
+        }
+
+        @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}")
+        public static Collection<Object[]> data() {
+            List<Object[]> values = new ArrayList<>();
+            for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE;
toMagic++) {
+                values.add(new Object[]{CompressionType.NONE, toMagic});
+                values.add(new Object[]{CompressionType.GZIP, toMagic});
             }
+            return values;
         }
-        return values;
-    }
 
-    @Test
-    public void doTestConversion() throws IOException {
-        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
-
-        Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
-                            new RecordHeader("headerKey2", "headerValue2".getBytes()),
-                            new RecordHeader("headerKey3", "headerValue3".getBytes())};
-
-        List<SimpleRecord> records = asList(
-                new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
-                new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
-                new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
-                new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
-                new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
-                new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
-                new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
-                new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
-                new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
-                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
-        assertEquals("incorrect test setup", offsets.size(), records.size());
-
-        ByteBuffer buffer = ByteBuffer.allocate(1024);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
compressionType,
-                TimestampType.CREATE_TIME, 0L);
-        for (int i = 0; i < 3; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-
-        builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME,
-                0L);
-        for (int i = 3; i < 6; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-
-        builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME,
-                0L);
-        for (int i = 6; i < 10; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-
-        buffer.flip();
+        /**
+         * Test the lazy down-conversion path.
+         */
+        @Test
+        public void testConversion() throws IOException {
+            doTestConversion(false);
+        }
+
+        /**
+         * Test the lazy down-conversion path where the number of bytes we want to convert
is much larger than the
+         * number of bytes we get after conversion. This causes overflow message batch(es)
to be appended towards the
+         * end of the converted output.
+         */
+        @Test
+        public void testConversionWithOverflow() throws IOException {
+            doTestConversion(true);
+        }
 
+        private void doTestConversion(boolean testConversionOverflow) throws IOException
{
+            List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
+
+            Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
+                                new RecordHeader("headerKey2", "headerValue2".getBytes()),
+                                new RecordHeader("headerKey3", "headerValue3".getBytes())};
+
+            List<SimpleRecord> records = asList(
+                    new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
+                    new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
+                    new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
+                    new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
+                    new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
+                    new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
+                    new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
+                    new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
+                    new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
+                    new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
+            assertEquals("incorrect test setup", offsets.size(), records.size());
+
+            ByteBuffer buffer = ByteBuffer.allocate(1024);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
compressionType,
+                    TimestampType.CREATE_TIME, 0L);
+            for (int i = 0; i < 3; i++)
+                builder.appendWithOffset(offsets.get(i), records.get(i));
+            builder.close();
+
+            builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME,
+                    0L);
+            for (int i = 3; i < 6; i++)
+                builder.appendWithOffset(offsets.get(i), records.get(i));
+            builder.close();
+
+            builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME,
+                    0L);
+            for (int i = 6; i < 10; i++)
+                builder.appendWithOffset(offsets.get(i), records.get(i));
+            builder.close();
+            buffer.flip();
+
+            MemoryRecords recordsToConvert = MemoryRecords.readableRecords(buffer);
+            int numBytesToConvert = recordsToConvert.sizeInBytes();
+            if (testConversionOverflow)
+                numBytesToConvert *= 2;
+
+            MemoryRecords convertedRecords = convertRecords(recordsToConvert, toMagic, numBytesToConvert);
+            verifyDownConvertedRecords(records, offsets, convertedRecords, compressionType,
toMagic);
+        }
+    }
+
+    private static MemoryRecords convertRecords(MemoryRecords recordsToConvert, byte toMagic,
int bytesToConvert) throws IOException {
         try (FileRecords inputRecords = FileRecords.open(tempFile())) {
-            MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
-            inputRecords.append(memoryRecords);
+            inputRecords.append(recordsToConvert);
             inputRecords.flush();
 
             LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(new TopicPartition("test",
1),
@@ -123,50 +162,27 @@ public class LazyDownConversionRecordsTest {
             File outputFile = tempFile();
             FileChannel channel = new RandomAccessFile(outputFile, "rw").getChannel();
 
-            // Size of lazy records is at least as much as the size of underlying records
-            assertTrue(lazyRecords.sizeInBytes() >= inputRecords.sizeInBytes());
-
-            int toWrite;
             int written = 0;
-            List<SimpleRecord> recordsBeingConverted;
-            List<Long> offsetsOfRecords;
-            switch (test) {
-                case DEFAULT:
-                    toWrite = inputRecords.sizeInBytes();
-                    recordsBeingConverted = records;
-                    offsetsOfRecords = offsets;
-                    break;
-                case OVERFLOW:
-                    toWrite = inputRecords.sizeInBytes() * 2;
-                    recordsBeingConverted = records;
-                    offsetsOfRecords = offsets;
-                    break;
-                default:
-                    throw new IllegalArgumentException();
-            }
-            while (written < toWrite)
-                written += lazySend.writeTo(channel, written, toWrite - written);
+            while (written < bytesToConvert)
+                written += lazySend.writeTo(channel, written, bytesToConvert - written);
 
             FileRecords convertedRecords = FileRecords.open(outputFile, true, (int) channel.size(),
false);
             ByteBuffer convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes());
             convertedRecords.readInto(convertedRecordsBuffer, 0);
-            MemoryRecords convertedMemoryRecords = MemoryRecords.readableRecords(convertedRecordsBuffer);
-            verifyDownConvertedRecords(recordsBeingConverted, offsetsOfRecords, convertedMemoryRecords,
compressionType, toMagic);
 
+            // cleanup
             convertedRecords.close();
             channel.close();
-        }
-    }
 
-    private String utf8(ByteBuffer buffer) {
-        return Utils.utf8(buffer, buffer.remaining());
+            return MemoryRecords.readableRecords(convertedRecordsBuffer);
+        }
     }
 
-    private void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
-                                            List<Long> initialOffsets,
-                                            MemoryRecords downConvertedRecords,
-                                            CompressionType compressionType,
-                                            byte toMagic) {
+    private static void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
+                                                   List<Long> initialOffsets,
+                                                   MemoryRecords downConvertedRecords,
+                                                   CompressionType compressionType,
+                                                   byte toMagic) {
         int i = 0;
         for (RecordBatch batch : downConvertedRecords.batches()) {
             assertTrue("Magic byte should be lower than or equal to " + toMagic, batch.magic()
<= toMagic);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 36b14a2..5d5221e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -30,6 +30,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
+import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -568,10 +569,6 @@ public class MemoryRecordsBuilderTest {
         }
     }
 
-    private String utf8(ByteBuffer buffer) {
-        return Utils.utf8(buffer, buffer.remaining());
-    }
-
     @Test
     public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(128);
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 6119536..89c90d1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -119,6 +119,14 @@
     </li>
     <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>
adds the ability
         to define ACLs on prefixed resources, e.g. any topic starting with 'foo'.</li>
+    <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion">KIP-283</a>
improves message down-conversion
+        handling on Kafka broker, which has typically been a memory-intensive operation.
The KIP adds a mechanism by which the operation becomes less memory intensive
+        by down-converting chunks of partition data at a time which helps put an upper bound
on memory consumption. With this improvement, there is a change in
+        <code>FetchResponse</code> protocol behavior where the broker could send
an oversized message batch towards the end of the response with an invalid offset.
+        Such oversized messages must be ignored by consumer clients, as is done by <code>KafkaConsumer</code>.
+    <p>KIP-283 also adds new topic and broker configurations <code>message.downconversion.enable</code>
and <code>log.message.downconversion.enable</code> respectively
+       to control whether down-conversion is enabled. When disabled, broker does not perform
any down-conversion and instead sends an <code>UNSUPPORTED_VERSION</code>
+       error to the client.</p></li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol
Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message