kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9820: validateMessagesAndAssignOffsetsCompressed allocates unused iterator (#8422)
Date Sat, 04 Apr 2020 17:06:16 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 46540eb  KAFKA-9820: validateMessagesAndAssignOffsetsCompressed allocates unused
iterator (#8422)
46540eb is described below

commit 46540eb5e0a7abb32a159250564d42137bd8b99f
Author: Lucas Bradstreet <lucas@confluent.io>
AuthorDate: Sat Apr 4 10:05:51 2020 -0700

    KAFKA-9820: validateMessagesAndAssignOffsetsCompressed allocates unused iterator (#8422)
    
    https://github.com/apache/kafka/commit/3e9d1c1411c5268de382f9dfcc95bdf66d0063a0 introduced
skipKeyValueIterator(s) which were intended to be used, but in this case were created but
were not used in offset validation.
    
    A subset of the benchmark results follow. Looks like a 20% improvement in validation performance
and a 40% reduction in garbage allocation for 1-2 batch sizes.
    
    **# Parameters: (bufferSupplierStr = NO_CACHING, bytes = RANDOM, compressionType = LZ4,
maxBatchSize = 1, messageSize = 1000, messageVersion = 2)**
    
    Before:
    Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
      64851.837 ±(99.9%) 944.248 ops/s [Average]
      (min, avg, max) = (64505.317, 64851.837, 65114.359), stdev = 245.218
      CI (99.9%): [63907.589, 65796.084] (assumes normal distribution)
    
    "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
      164088.003 ±(99.9%) 0.004 B/op [Average]
      (min, avg, max) = (164088.001, 164088.003, 164088.004), stdev = 0.001
      CI (99.9%): [164087.998, 164088.007] (assumes normal distribution)
    
    After:
    
    Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
      78910.273 ±(99.9%) 707.024 ops/s [Average]
      (min, avg, max) = (78785.486, 78910.273, 79234.007), stdev = 183.612
      CI (99.9%): [78203.249, 79617.297] (assumes normal distribution)
    
    "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
      96440.002 ±(99.9%) 0.001 B/op [Average]
      (min, avg, max) = (96440.002, 96440.002, 96440.002), stdev = 0.001
      CI (99.9%): [96440.002, 96440.003] (assumes normal distribution)
    
     **# Parameters: (bufferSupplierStr = NO_CACHING, bytes = RANDOM, compressionType = LZ4,
maxBatchSize = 2, messageSize = 1000, messageVersion = 2)**
    
    Before:
    Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
      64815.364 ±(99.9%) 639.309 ops/s [Average]
      (min, avg, max) = (64594.545, 64815.364, 64983.305), stdev = 166.026
      CI (99.9%): [64176.056, 65454.673] (assumes normal distribution)
                                                                                         
                                                                                         
        "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
      163944.003 ±(99.9%) 0.001 B/op [Average]
      (min, avg, max) = (163944.002, 163944.003, 163944.003), stdev = 0.001
      CI (99.9%): [163944.002, 163944.004] (assumes normal distribution)
    
    After:
    Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
      77075.096 ±(99.9%) 201.092 ops/s [Average]
      (min, avg, max) = (77021.537, 77075.096, 77129.693), stdev = 52.223
      CI (99.9%): [76874.003, 77276.188] (assumes normal distribution)
    
    "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
      96504.002 ±(99.9%) 0.003 B/op [Average]
      (min, avg, max) = (96504.001, 96504.002, 96504.003), stdev = 0.001
      CI (99.9%): [96503.999, 96504.005] (assumes normal distribution)
    
    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 checkstyle/import-control-jmh-benchmarks.xml       |  2 +
 core/src/main/scala/kafka/log/LogValidator.scala   |  4 +-
 .../jmh/record/RecordBatchIterationBenchmark.java  | 43 ++++++++++++++++++++--
 3 files changed, 45 insertions(+), 4 deletions(-)

diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index 4b546cb..e5017e4 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -44,6 +44,8 @@
     <allow class="kafka.utils.Pool"/>
     <allow class="kafka.utils.KafkaScheduler"/>
     <allow class="org.apache.kafka.clients.FetchSessionHandler"/>
+    <allow pkg="kafka.common"/>
+    <allow pkg="kafka.message"/>
     <allow pkg="org.mockito"/>
     <allow pkg="kafka.security.authorizer"/>
     <allow pkg="org.apache.kafka.server"/>
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 08c2270..4a30777 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -414,7 +414,8 @@ private[log] object LogValidator extends Logging {
 
       try {
         val recordErrors = new ArrayBuffer[ApiRecordError](0)
-        for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+        var batchIndex = 0
+        for (record <- recordsIterator.asScala) {
           val expectedOffset = expectedInnerOffset.getAndIncrement()
           val recordError = validateRecordCompression(batchIndex, record).orElse {
             validateRecord(batch, topicPartition, record, batchIndex, now,
@@ -433,6 +434,7 @@ private[log] object LogValidator extends Logging {
               uncompressedSizeInBytes += record.sizeInBytes()
               validatedRecords += record
           }
+         batchIndex += 1
         }
         processRecordErrors(recordErrors)
       } finally {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
index 73552c2..da1b645 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
@@ -16,6 +16,13 @@
  */
 package org.apache.kafka.jmh.record;
 
+import kafka.api.ApiVersion;
+import kafka.common.LongRef;
+import kafka.log.AppendOrigin;
+import kafka.log.LogValidator;
+import kafka.message.CompressionCodec;
+import kafka.server.BrokerTopicStats;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.BufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
@@ -26,6 +33,7 @@ import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.Time;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
@@ -57,7 +65,7 @@ public class RecordBatchIterationBenchmark {
         RANDOM, ONES
     }
 
-    @Param(value = {"10", "50", "200", "500"})
+    @Param(value = {"1", "2", "10", "50", "200", "500"})
     private int maxBatchSize = 200;
 
     @Param(value = {"LZ4", "SNAPPY", "GZIP", "ZSTD", "NONE"})
@@ -72,8 +80,11 @@ public class RecordBatchIterationBenchmark {
     @Param(value = {"RANDOM", "ONES"})
     private Bytes bytes = Bytes.RANDOM;
 
+    @Param(value = {"NO_CACHING", "CREATE"})
+    private String bufferSupplierStr;
+
     // zero starting offset is much faster for v1 batches, but that will almost never happen
-    private final int startingOffset = 42;
+    private int startingOffset;
 
     // Used by measureSingleMessage
     private ByteBuffer singleBatchBuffer;
@@ -83,9 +94,22 @@ public class RecordBatchIterationBenchmark {
     private int[] batchSizes;
     private BufferSupplier bufferSupplier;
 
+
+
     @Setup
     public void init() {
-        bufferSupplier = BufferSupplier.create();
+        // For v0 batches a zero starting offset is much faster but that will almost never
happen.
+        // For v2 batches we use starting offset = 0 as these batches are relative to the
base
+        // offset and measureValidation will mutate these batches between iterations
+        startingOffset = messageVersion == 2 ? 0 : 42;
+
+        if (bufferSupplierStr.equals("NO_CACHING")) {
+            bufferSupplier = BufferSupplier.NO_CACHING;
+        } else if (bufferSupplierStr.equals("CREATE")) {
+            bufferSupplier = BufferSupplier.create();
+        } else {
+            throw new IllegalArgumentException("Unsupported buffer supplier " + bufferSupplierStr);
+        }
         singleBatchBuffer = createBatch(1);
 
         batchBuffers = new ByteBuffer[batchCount];
@@ -123,6 +147,19 @@ public class RecordBatchIterationBenchmark {
     }
 
     @Benchmark
+    public void measureValidation(Blackhole bh) throws IOException {
+        MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
+        LogValidator.validateMessagesAndAssignOffsetsCompressed(records, new TopicPartition("a",
0),
+                new LongRef(startingOffset), Time.SYSTEM, System.currentTimeMillis(),
+                CompressionCodec.getCompressionCodec(compressionType.id),
+                CompressionCodec.getCompressionCodec(compressionType.id),
+                false,  messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
+                new AppendOrigin.Client$(),
+                ApiVersion.latestVersion(),
+                new BrokerTopicStats());
+    }
+
+    @Benchmark
     public void measureIteratorForBatchWithSingleMessage(Blackhole bh) throws IOException
{
         for (RecordBatch batch : MemoryRecords.readableRecords(singleBatchBuffer.duplicate()).batches())
{
             try (CloseableIterator<Record> iterator = batch.streamingIterator(bufferSupplier))
{


Mime
View raw message