kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 0.10.2 updated: KAFKA-8722; Ensure record CRC is always verified when appending to log (#7124)
Date Wed, 31 Jul 2019 09:26:10 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
     new 8007211  KAFKA-8722; Ensure record CRC is always verified when appending to log (#7124)
8007211 is described below

commit 8007211cc982d8458223e866c1ee7d94b69e0249
Author: lordcheng10 <1572139390@qq.com>
AuthorDate: Wed Jul 31 17:25:37 2019 +0800

    KAFKA-8722; Ensure record CRC is always verified when appending to log (#7124)
    
    We found that when record.offset is not equal to the offset we are expecting, kafka will
set the variable inPlaceAssignment to false. When inPlaceAssignment is false, data will not
be verified. This patch adds the missing CRC verification.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/log/LogValidator.scala | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 47d4bd4..3555710 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -166,6 +166,7 @@ private[kafka] object LogValidator {
 
     records.deepEntries(true, BufferSupplier.NO_CACHING).asScala.foreach { logEntry =>
       val record = logEntry.record
+      record.ensureValid()
       validateKey(record, compactedTopic)
 
       if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0)
{
@@ -201,9 +202,6 @@ private[kafka] object LogValidator {
         shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
         messageSizeMaybeChanged = true)
     } else {
-      // ensure the inner messages are valid
-      validatedRecords.foreach(_.ensureValid)
-
       // we can update the wrapper message only and write the compressed payload as is
       val entry = records.shallowEntries.iterator.next()
       val offset = offsetCounter.addAndGet(validatedRecords.size) - 1


Mime
View raw message