kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5360; Down-converted uncompressed batches should respect fetch offset
Date Thu, 01 Jun 2017 17:19:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a08634642 -> 8e8b3c565


KAFKA-5360; Down-converted uncompressed batches should respect fetch offset

More specifically, V2 messages are always batched (whether compressed or not) while
V0/V1 are only batched if they are compressed.

Clients like librdkafka expect to receive messages from the fetch offset when dealing with
uncompressed V0/V1 messages. When converting from V2 to V0/1, we were returning all the
messages in the V2 batch.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3191 from ijuma/kafka-5360-down-converted-uncompressed-respect-offset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e8b3c56
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e8b3c56
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e8b3c56

Branch: refs/heads/trunk
Commit: 8e8b3c56572a825d3c1beb6ad77ce88571354f51
Parents: a086346
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Jun 1 10:17:03 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Jun 1 10:17:03 2017 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      |  2 +-
 .../kafka/common/record/AbstractRecords.java    | 22 ++++-
 .../apache/kafka/common/record/FileRecords.java |  4 +-
 .../kafka/common/record/MemoryRecords.java      |  4 +-
 .../org/apache/kafka/common/record/Records.java |  4 +-
 .../kafka/common/record/FileRecordsTest.java    | 54 +++++++++---
 .../common/record/MemoryRecordsBuilderTest.java | 43 ++++++++--
 .../src/main/scala/kafka/server/KafkaApis.scala | 29 ++++---
 .../unit/kafka/server/FetchRequestTest.scala    | 89 +++++++++++++++++---
 9 files changed, 201 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 01ff91a..4f1c7d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -648,7 +648,7 @@ public class Sender implements Runnable {
             // not all support the same message format version. For example, if a partition
migrates from a broker
             // which is supporting the new magic version to one which doesn't, then we will
need to convert.
             if (!records.hasMatchingMagic(minUsedMagic))
-                records = batch.records().downConvert(minUsedMagic);
+                records = batch.records().downConvert(minUsedMagic, 0);
             produceRecordsByPartition.put(tp, records);
             recordsByPartition.put(tp, batch);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 2771ab7..04d7071 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -50,7 +50,16 @@ public abstract class AbstractRecords implements Records {
         return true;
     }
 
-    protected MemoryRecords downConvert(Iterable<? extends RecordBatch> batches, byte
toMagic) {
+    /**
+     * Down convert batches to the provided message format version. The first offset parameter
is only relevant in the
+     * conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed
records in v0 and v1
+     * are not batched (put another way, each batch always has 1 record).
+     *
+     * If a client requests records in v1 format starting from the middle of an uncompressed
batch in v2 format, we
+     * need to drop records from the batch during the conversion. Some versions of librdkafka
rely on this for
+     * correctness.
+     */
+    protected MemoryRecords downConvert(Iterable<? extends RecordBatch> batches, byte
toMagic, long firstOffset) {
         // maintain the batch along with the decompressed records to avoid the need to decompress
again
         List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
         int totalSizeEstimate = 0;
@@ -63,9 +72,16 @@ public abstract class AbstractRecords implements Records {
                 totalSizeEstimate += batch.sizeInBytes();
                 recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
             } else {
-                List<Record> records = Utils.toList(batch.iterator());
+                List<Record> records = new ArrayList<>();
+                for (Record record : batch) {
+                    // See the method javadoc for an explanation
+                    if (toMagic > RecordBatch.MAGIC_VALUE_V1 || batch.isCompressed() ||
record.offset() >= firstOffset)
+                        records.add(record);
+                }
+                if (records.isEmpty())
+                    continue;
                 final long baseOffset;
-                if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2)
+                if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && toMagic >=
RecordBatch.MAGIC_VALUE_V2)
                     baseOffset = batch.baseOffset();
                 else
                     baseOffset = records.get(0).offset();

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 32ca1a7..35431d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -230,7 +230,7 @@ public class FileRecords extends AbstractRecords implements Closeable
{
     }
 
     @Override
-    public Records downConvert(byte toMagic) {
+    public Records downConvert(byte toMagic, long firstOffset) {
         List<? extends RecordBatch> batches = Utils.toList(batches().iterator());
         if (batches.isEmpty()) {
             // This indicates that the message is too large, which means that the buffer
is not large
@@ -242,7 +242,7 @@ public class FileRecords extends AbstractRecords implements Closeable
{
             // one full message, even if it requires exceeding the max fetch size requested
by the client.
             return this;
         } else {
-            return downConvert(batches, toMagic);
+            return downConvert(batches, toMagic, firstOffset);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 46798cf..e158e2f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -109,8 +109,8 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     @Override
-    public MemoryRecords downConvert(byte toMagic) {
-        return downConvert(batches(), toMagic);
+    public MemoryRecords downConvert(byte toMagic, long firstOffset) {
+        return downConvert(batches(), toMagic, firstOffset);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index a5a5036..ec2e717 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -96,9 +96,11 @@ public interface Records {
      * Convert all batches in this buffer to the format passed as a parameter. Note that
this requires
      * deep iteration since all of the deep records must also be converted to the desired
format.
      * @param toMagic The magic value to convert to
+     * @param firstOffset The starting offset for returned records. This only impacts some
cases. See
+     *                    {@link AbstractRecords#downConvert(Iterable, byte, long)} for an
explanation.
      * @return A Records instance (which may or may not be the same instance)
      */
-    Records downConvert(byte toMagic);
+    Records downConvert(byte toMagic, long firstOffset);
 
     /**
      * Get an iterator over the records in this log. Note that this generally requires decompression,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 8b9c900..b41db67 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -26,6 +27,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -308,7 +310,7 @@ public class FileRecordsTest {
         int start = fileRecords.searchForOffsetWithSize(1, 0).position;
         int size = batch.sizeInBytes();
         FileRecords slice = fileRecords.read(start, size - 1);
-        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0);
+        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0);
         assertTrue("No message should be there", batches(messageV0).isEmpty());
         assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
     }
@@ -324,31 +326,34 @@ public class FileRecordsTest {
     }
 
     private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException
{
-        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L);
+        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
         List<SimpleRecord> records = asList(
                 new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
                 new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
                 new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
                 new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
                 new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
-                new SimpleRecord(6L, "k6".getBytes(), "goodbye forever".getBytes()));
+                new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
+                new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
+                new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()),
+                new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
+                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes()));
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0,
compressionType,
                 TimestampType.CREATE_TIME, 0L);
-        for (int i = 0; i < 2; i++)
+        for (int i = 0; i < 3; i++)
             builder.appendWithOffset(offsets.get(i), records.get(i));
         builder.close();
 
-        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
-                TimestampType.CREATE_TIME, 0L);
-        for (int i = 2; i < 4; i++)
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME,
+                0L);
+        for (int i = 3; i < 6; i++)
             builder.appendWithOffset(offsets.get(i), records.get(i));
         builder.close();
 
-        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
-                TimestampType.CREATE_TIME, 0L);
-        for (int i = 4; i < 6; i++)
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
TimestampType.CREATE_TIME, 0L);
+        for (int i = 6; i < 10; i++)
             builder.appendWithOffset(offsets.get(i), records.get(i));
         builder.close();
 
@@ -357,11 +362,34 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(MemoryRecords.readableRecords(buffer));
             fileRecords.flush();
-            Records convertedRecords = fileRecords.downConvert(toMagic);
+            Records convertedRecords = fileRecords.downConvert(toMagic, 0L);
             verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
+
+            if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == CompressionType.NONE)
{
+                long firstOffset;
+                if (toMagic == RecordBatch.MAGIC_VALUE_V0)
+                    firstOffset = 11L; // v1 record
+                else
+                    firstOffset = 17; // v2 record
+                Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset);
+                List<Long> filteredOffsets = new ArrayList<>(offsets);
+                List<SimpleRecord> filteredRecords = new ArrayList<>(records);
+                int index = filteredOffsets.indexOf(firstOffset) - 1;
+                filteredRecords.remove(index);
+                filteredOffsets.remove(index);
+                verifyConvertedRecords(filteredRecords, filteredOffsets, convertedRecords2,
compressionType, toMagic);
+            } else {
+                // firstOffset doesn't have any effect in this case
+                Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L);
+                verifyConvertedRecords(records, offsets, convertedRecords2, compressionType,
toMagic);
+            }
         }
     }
 
+    private String utf8(ByteBuffer buffer) {
+        return Utils.utf8(buffer, buffer.remaining());
+    }
+
     private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
                                         List<Long> initialOffsets,
                                         Records convertedRecords,
@@ -378,8 +406,8 @@ public class FileRecordsTest {
             for (Record record : batch) {
                 assertTrue("Inner record should have magic " + magicByte, record.hasMagic(batch.magic()));
                 assertEquals("Offset should not change", initialOffsets.get(i).longValue(),
record.offset());
-                assertEquals("Key should not change", initialRecords.get(i).key(), record.key());
-                assertEquals("Value should not change", initialRecords.get(i).value(), record.value());
+                assertEquals("Key should not change", utf8(initialRecords.get(i).key()),
utf8(record.key()));
+                assertEquals("Value should not change", utf8(initialRecords.get(i).value()),
utf8(record.value()));
                 assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
                 if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
                     assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 9734f59..f10bd98 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -432,7 +432,7 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
+        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1,
0);
 
         List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
@@ -469,24 +469,57 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
+        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1,
0);
 
         List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
             assertEquals(2, batches.size());
             assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(0, batches.get(0).baseOffset());
             assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(1, batches.get(1).baseOffset());
         } else {
             assertEquals(3, batches.size());
             assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(0, batches.get(0).baseOffset());
             assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(1, batches.get(1).baseOffset());
             assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(2).magic());
+            assertEquals(2, batches.get(2).baseOffset());
         }
 
         List<Record> logRecords = Utils.toList(records.records().iterator());
-        assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
-        assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
-        assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
+        assertEquals("1", utf8(logRecords.get(0).key()));
+        assertEquals("2", utf8(logRecords.get(1).key()));
+        assertEquals("3", utf8(logRecords.get(2).key()));
+
+        records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1,
2L);
+
+        batches = Utils.toList(records.batches().iterator());
+        logRecords = Utils.toList(records.records().iterator());
+
+        if (compressionType != CompressionType.NONE) {
+            assertEquals(2, batches.size());
+            assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(0, batches.get(0).baseOffset());
+            assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(1, batches.get(1).baseOffset());
+            assertEquals("1", utf8(logRecords.get(0).key()));
+            assertEquals("2", utf8(logRecords.get(1).key()));
+            assertEquals("3", utf8(logRecords.get(2).key()));
+        } else {
+            assertEquals(2, batches.size());
+            assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(0, batches.get(0).baseOffset());
+            assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(2, batches.get(1).baseOffset());
+            assertEquals("1", utf8(logRecords.get(0).key()));
+            assertEquals("3", utf8(logRecords.get(1).key()));
+        }
+    }
+
+    private String utf8(ByteBuffer buffer) {
+        return Utils.utf8(buffer, buffer.remaining());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index eb0bf3b..5ce590f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -508,19 +508,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       // know it must be supported. However, if the magic version is changed from a higher
version back to a
       // lower version, this check will no longer be valid and we will fail to down-convert
the messages
       // which were written in the new format prior to the version downgrade.
-      replicaManager.getMagic(tp) match {
-        case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0)
=>
-          trace(s"Down converting message to V0 for fetch request from $clientId")
-          new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-              data.logStartOffset, data.abortedTransactions, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
+      replicaManager.getMagic(tp).flatMap { magic =>
+        val downConvertMagic = {
+          if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 &&
!data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+            Some(RecordBatch.MAGIC_VALUE_V0)
+          else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 &&
!data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
+            Some(RecordBatch.MAGIC_VALUE_V1)
+          else
+            None
+        }
 
-        case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)
=>
-          trace(s"Down converting message to V1 for fetch request from $clientId")
+        downConvertMagic.map { magic =>
+          trace(s"Down converting records from partition $tp to message format version $magic
for fetch request from $clientId")
+          val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset)
           new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-              data.logStartOffset, data.abortedTransactions, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
+            data.logStartOffset, data.abortedTransactions, converted)
+        }
 
-        case _ => data
-      }
+      }.getOrElse(data)
     }
 
     // the callback for process a fetch response, invoked before throttling
@@ -549,7 +554,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
         def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
           val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
-          fetchedPartitionData.asScala.foreach(e => convertedData.put(e._1, convertedPartitionData(e._1,
e._2)))
+          fetchedPartitionData.asScala.foreach { case (tp, partitionData) =>
+            convertedData.put(tp, convertedPartitionData(tp, partitionData))
+          }
           val response = new FetchResponse(convertedData, 0)
           val responseStruct = response.toStruct(versionId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 48b3945..c5d40f6 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -19,13 +19,14 @@ package kafka.server
 import java.util
 import java.util.Properties
 
+import kafka.api.KAFKA_0_11_0_IV2
 import kafka.log.LogConfig
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.{Record, RecordBatch}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.junit.Assert._
@@ -42,12 +43,6 @@ class FetchRequestTest extends BaseRequestTest {
 
   private var producer: KafkaProducer[String, String] = null
 
-  override def setUp() {
-    super.setUp()
-    producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
-  }
-
   override def tearDown() {
     producer.close()
     super.tearDown()
@@ -67,14 +62,20 @@ class FetchRequestTest extends BaseRequestTest {
     partitionMap
   }
 
-  private def sendFetchRequest(leaderId: Int, request: FetchRequest,
-                               version: Short = ApiKeys.FETCH.latestVersion): FetchResponse
= {
+  private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = {
     val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
-    FetchResponse.parse(response, version)
+    FetchResponse.parse(response, request.version)
+  }
+
+  private def initProducer(): Unit = {
+    producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
   }
 
   @Test
   def testBrokerRespectsPartitionsOrderAndSizeLimits(): Unit = {
+    initProducer()
+
     val messagesPerPartition = 9
     val maxResponseBytes = 800
     val maxPartitionBytes = 190
@@ -152,13 +153,14 @@ class FetchRequestTest extends BaseRequestTest {
 
   @Test
   def testFetchRequestV2WithOversizedMessage(): Unit = {
+    initProducer()
     val maxPartitionBytes = 200
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
     producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
       "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
     val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
       Seq(topicPartition))).build(2)
-    val fetchResponse = sendFetchRequest(leaderId, fetchRequest, version = 2)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
     val partitionData = fetchResponse.responseData.get(topicPartition)
     assertEquals(Errors.NONE, partitionData.error)
     assertTrue(partitionData.highWatermark > 0)
@@ -166,6 +168,68 @@ class FetchRequestTest extends BaseRequestTest {
     assertEquals(0, records(partitionData).map(_.sizeInBytes).sum)
   }
 
+  /**
+    * Ensure that we respect the fetch offset when returning records that were converted
from an uncompressed v2
+    * record batch to multiple v0/v1 record batches with size 1. If the fetch offset points
to inside the record batch,
+    * some records have to be dropped during the conversion.
+    */
+  @Test
+  def testDownConversionFromBatchedToUnbatchedRespectsOffset(): Unit = {
+    // Increase linger so that we have control over the batches created
+    producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer,
+      lingerMs = 300 * 1000)
+
+    val topicConfig = Map(LogConfig.MessageFormatVersionProp -> KAFKA_0_11_0_IV2.version)
+    val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, topicConfig).head
+    val topic = topicPartition.topic
+
+    val firstBatchFutures = (0 until 10).map(i => producer.send(new ProducerRecord(topic,
s"key-$i", s"value-$i")))
+    producer.flush()
+    val secondBatchFutures = (10 until 25).map(i => producer.send(new ProducerRecord(topic,
s"key-$i", s"value-$i")))
+    producer.flush()
+
+    firstBatchFutures.foreach(_.get)
+    secondBatchFutures.foreach(_.get)
+
+    def check(fetchOffset: Long, requestVersion: Short, expectedOffset: Long, expectedNumBatches:
Int, expectedMagic: Byte): Unit = {
+      val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(Int.MaxValue,
+        Seq(topicPartition), Map(topicPartition -> fetchOffset))).build(requestVersion)
+      val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+      val partitionData = fetchResponse.responseData.get(topicPartition)
+      assertEquals(Errors.NONE, partitionData.error)
+      assertTrue(partitionData.highWatermark > 0)
+      val batches = partitionData.records.batches.asScala.toBuffer
+      assertEquals(expectedNumBatches, batches.size)
+      val batch = batches.head
+      assertEquals(expectedMagic, batch.magic)
+      assertEquals(expectedOffset, batch.baseOffset)
+    }
+
+    // down conversion to message format 0, batches of 1 message are returned so we receive
the exact offset we requested
+    check(fetchOffset = 3, expectedOffset = 3, requestVersion = 1, expectedNumBatches = 22,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V0)
+    check(fetchOffset = 15, expectedOffset = 15, requestVersion = 1, expectedNumBatches =
10,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V0)
+
+    // down conversion to message format 1, batches of 1 message are returned so we receive
the exact offset we requested
+    check(fetchOffset = 3, expectedOffset = 3, requestVersion = 3, expectedNumBatches = 22,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V1)
+    check(fetchOffset = 15, expectedOffset = 15, requestVersion = 3, expectedNumBatches =
10,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V1)
+
+    // no down conversion, we receive a single batch so the received offset won't necessarily
be the same
+    check(fetchOffset = 3, expectedOffset = 0, requestVersion = 4, expectedNumBatches = 2,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V2)
+    check(fetchOffset = 15, expectedOffset = 10, requestVersion = 4, expectedNumBatches =
1,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V2)
+
+    // no down conversion, we receive a single batch and the exact offset we requested because
it happens to be the
+    // offset of the first record in the batch
+    check(fetchOffset = 10, expectedOffset = 10, requestVersion = 4, expectedNumBatches =
1,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V2)
+  }
+
   private def records(partitionData: FetchResponse.PartitionData): Seq[Record] = {
     partitionData.records.records.asScala.toIndexedSeq
   }
@@ -207,10 +271,11 @@ class FetchRequestTest extends BaseRequestTest {
     assertTrue(responseSize <= maxResponseBytes)
   }
 
-  private def createTopics(numTopics: Int, numPartitions: Int): Map[TopicPartition, Int]
= {
+  private def createTopics(numTopics: Int, numPartitions: Int, configs: Map[String, String]
= Map.empty): Map[TopicPartition, Int] = {
     val topics = (0 until numPartitions).map(t => s"topic$t")
     val topicConfig = new Properties
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
+    configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
     topics.flatMap { topic =>
       val partitionToLeader = createTopic(zkUtils, topic, numPartitions = numPartitions,
replicationFactor = 2,
         servers = servers, topicConfig = topicConfig)


Mime
View raw message