kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9669; Loosen validation of inner offsets for older message formats (#8647)
Date Tue, 12 May 2020 19:16:54 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new cc26fcc  KAFKA-9669; Loosen validation of inner offsets for older message formats
(#8647)
cc26fcc is described below

commit cc26fccea330087441fe276f1d2047f067e49e46
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue May 12 12:06:08 2020 -0700

    KAFKA-9669; Loosen validation of inner offsets for older message formats (#8647)
    
    Prior to KAFKA-8106, we allowed the v0 and v1 message formats to contain non-consecutive
inner offsets. Inside `LogValidator`, we would detect this case and rewrite the batch. After
KAFKA-8106, we changed the logic to raise an error in the case of the v1 message format (v0
was still expected to be rewritten). This caused an incompatibility for older clients which
were depending on the looser validation. This patch reverts the old logic of rewriting the
batch to fix the invalid inner offsets.
    
    Note that the v2 message format has always had stricter validation. This patch also adds
a test case for this.
    
    Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Ismael
Juma <ismael@juma.me.uk>
---
 .../kafka/common/record/MemoryRecordsBuilder.java  | 30 +++++++++++++
 core/src/main/scala/kafka/log/LogValidator.scala   | 14 +++----
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 49 ++++++++++++++++++++--
 3 files changed, 83 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 054fb86..fd5a680 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -588,6 +589,35 @@ public class MemoryRecordsBuilder implements AutoCloseable {
     }
 
     /**
+     * Append a record without doing offset/magic validation (this should only be used in
testing).
+     *
+     * @param offset The offset of the record
+     * @param record The record to add
+     */
+    public void appendUncheckedWithOffset(long offset, SimpleRecord record) throws IOException
{
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            int offsetDelta = (int) (offset - baseOffset);
+            long timestamp = record.timestamp();
+            if (firstTimestamp == null)
+                firstTimestamp = timestamp;
+
+            int sizeInBytes = DefaultRecord.writeTo(appendStream,
+                offsetDelta,
+                timestamp - firstTimestamp,
+                record.key(),
+                record.value(),
+                record.headers());
+            recordWritten(offset, timestamp, sizeInBytes);
+        } else {
+            LegacyRecord legacyRecord = LegacyRecord.create(magic,
+                record.timestamp(),
+                Utils.toNullableArray(record.key()),
+                Utils.toNullableArray(record.value()));
+            appendUncheckedWithOffset(offset, legacyRecord);
+        }
+    }
+
+    /**
      * Append a record at the next sequential offset.
      * @param record the record to add
      */
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index b72ab1f..b31d45a 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -394,14 +394,14 @@ private[log] object LogValidator extends Logging {
 
           uncompressedSizeInBytes += record.sizeInBytes()
           if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0)
{
-            // inner records offset should always be continuous
+            // Some older clients do not implement the V1 internal offsets correctly.
+            // Historically the broker handled this by rewriting the batches rather
+            // than rejecting the request. We must continue this handling here to avoid
+            // breaking these clients.
             val expectedOffset = expectedInnerOffset.getAndIncrement()
-            if (record.offset != expectedOffset) {
-              brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
-              throw new RecordValidationException(
-                new InvalidRecordException(s"Inner record $record inside the compressed record
batch does not have incremental offsets, expected offset is $expectedOffset in topic partition
$topicPartition."),
-                List(new RecordError(batchIndex)))
-            }
+            if (record.offset != expectedOffset)
+              inPlaceAssignment = false
+
             if (record.timestamp > maxTimestamp)
               maxTimestamp = record.timestamp
           }
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 0515e79..ba56f14 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit
 import com.yammer.metrics.Metrics
 import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
 import kafka.common.{LongRef, RecordValidationException}
+import kafka.log.LogValidator.ValidationAndOffsetAssignResult
 import kafka.message._
 import kafka.server.BrokerTopicStats
 import kafka.utils.TestUtils.meterCount
-import org.apache.kafka.common.InvalidRecordException
-import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException,
UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
@@ -65,6 +65,29 @@ class LogValidatorTest {
   }
 
   @Test
+  def testValidationOfBatchesWithNonSequentialInnerOffsets(): Unit = {
+    def testMessageValidation(magicValue: Byte): Unit = {
+      val numRecords = 20
+      val invalidRecords = recordsWithNonSequentialInnerOffsets(magicValue, CompressionType.GZIP,
numRecords)
+
+      // Validation for v2 and above is strict for this case. For older formats, we fix invalid
+      // internal offsets by rewriting the batch.
+      if (magicValue >= RecordBatch.MAGIC_VALUE_V2) {
+        assertThrows[InvalidRecordException] {
+          validateMessages(invalidRecords, magicValue, CompressionType.GZIP, CompressionType.GZIP)
+        }
+      } else {
+        val result = validateMessages(invalidRecords, magicValue, CompressionType.GZIP, CompressionType.GZIP)
+        assertEquals(0 until numRecords, result.validatedRecords.records.asScala.map(_.offset))
+      }
+    }
+
+    for (version <- RecordVersion.values) {
+      testMessageValidation(version.value)
+    }
+  }
+
+  @Test
   def testMisMatchMagic(): Unit = {
     checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
     checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
@@ -88,7 +111,10 @@ class LogValidatorTest {
     assertTrue(meterCount(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}") > 0)
   }
 
-  private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType:
CompressionType, targetCompressionType: CompressionType): Unit = {
+  private def validateMessages(records: MemoryRecords,
+                               magic: Byte,
+                               sourceCompressionType: CompressionType,
+                               targetCompressionType: CompressionType): ValidationAndOffsetAssignResult
= {
     LogValidator.validateMessagesAndAssignOffsets(records,
       topicPartition,
       new LongRef(0L),
@@ -1371,6 +1397,23 @@ class LogValidatorTest {
     }
   }
 
+  private def recordsWithNonSequentialInnerOffsets(magicValue: Byte,
+                                                   codec: CompressionType,
+                                                   numRecords: Int): MemoryRecords = {
+    val records = (0 until numRecords).map { id =>
+      new SimpleRecord(id.toString.getBytes)
+    }
+
+    val buffer = ByteBuffer.allocate(1024)
+    val builder = MemoryRecords.builder(buffer, magicValue, codec, TimestampType.CREATE_TIME,
0L)
+
+    records.foreach { record =>
+      builder.appendUncheckedWithOffset(0, record)
+    }
+
+    builder.build()
+  }
+
   private def recordsWithInvalidInnerMagic(batchMagicValue: Byte,
                                            recordMagicValue: Byte,
                                            codec: CompressionType): MemoryRecords = {


Mime
View raw message