kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Reuse decompression buffers in log cleaner
Date Sat, 03 Jun 2017 01:07:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8104c0de2 -> 1c786c589


MINOR: Reuse decompression buffers in log cleaner

Follow-up to KAFKA-5150, reuse decompression buffers in the log cleaner thread.

Author: Xavier Léauté <xavier@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3180 from xvrl/logcleaner-decompression-buffers


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c786c58
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c786c58
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c786c58

Branch: refs/heads/trunk
Commit: 1c786c589ab6737e1660981e739582935f9e0f0d
Parents: 8104c0d
Author: Xavier Léauté <xavier@confluent.io>
Authored: Sat Jun 3 02:06:48 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jun 3 02:07:01 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/record/MemoryRecords.java      | 53 ++++++++++++--------
 .../kafka/common/record/MemoryRecordsTest.java  | 16 +++---
 core/src/main/scala/kafka/log/LogCleaner.scala  |  4 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  4 +-
 4 files changed, 46 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1c786c58/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index e158e2f..1427421 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.CloseableIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,21 +116,28 @@ public class MemoryRecords extends AbstractRecords {
 
     /**
      * Filter the records into the provided ByteBuffer.
-     * @param partition The partition that is filtered (used only for logging)
-     * @param filter The filter function
-     * @param destinationBuffer The byte buffer to write the filtered records to
-     * @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit:
if a batch
-     *                           exceeds this after filtering, we log a warning, but the
batch will still be
-     *                           created.
+     *
+     * @param partition                   The partition that is filtered (used only for logging)
+     * @param filter                      The filter function
+     * @param destinationBuffer           The byte buffer to write the filtered records to
+     * @param maxRecordBatchSize          The maximum record batch size. Note this is not
a hard limit: if a batch
+     *                                    exceeds this after filtering, we log a warning,
but the batch will still be
+     *                                    created.
+     * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression
if supported. For small
+     *                                    record batches, allocating a potentially large
buffer (64 KB for LZ4) will
+     *                                    dominate the cost of decompressing and iterating
over the records in the
+     *                                    batch. As such, a supplier that reuses buffers
will have a significant
+     *                                    performance impact.
      * @return A FilterResult with a summary of the output (for metrics) and potentially
an overflow buffer
      */
     public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer
destinationBuffer,
-                                 int maxRecordBatchSize) {
-        return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize);
+                                 int maxRecordBatchSize, BufferSupplier decompressionBufferSupplier)
{
+        return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize,
decompressionBufferSupplier);
     }
 
     private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch>
batches,
-                                         RecordFilter filter, ByteBuffer destinationBuffer,
int maxRecordBatchSize) {
+                                         RecordFilter filter, ByteBuffer destinationBuffer,
int maxRecordBatchSize,
+                                         BufferSupplier decompressionBufferSupplier) {
         long maxTimestamp = RecordBatch.NO_TIMESTAMP;
         long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
@@ -155,21 +163,24 @@ public class MemoryRecords extends AbstractRecords {
             boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            for (Record record : batch) {
-                messagesRead += 1;
+            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier))
{
+                while (iterator.hasNext()) {
+                    Record record = iterator.next();
+                    messagesRead += 1;
 
-                if (filter.shouldRetain(batch, record)) {
-                    // Check for log corruption due to KAFKA-4298. If we find it, make sure
that we overwrite
-                    // the corrupted batch with correct data.
-                    if (!record.hasMagic(batchMagic))
-                        writeOriginalBatch = false;
+                    if (filter.shouldRetain(batch, record)) {
+                        // Check for log corruption due to KAFKA-4298. If we find it, make
sure that we overwrite
+                        // the corrupted batch with correct data.
+                        if (!record.hasMagic(batchMagic))
+                            writeOriginalBatch = false;
 
-                    if (record.offset() > maxOffset)
-                        maxOffset = record.offset();
+                        if (record.offset() > maxOffset)
+                            maxOffset = record.offset();
 
-                    retainedRecords.add(record);
-                } else {
-                    writeOriginalBatch = false;
+                        retainedRecords.add(record);
+                    } else {
+                        writeOriginalBatch = false;
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c786c58/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 619fbbd..b251d6c 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -205,7 +205,7 @@ public class MemoryRecordsTest {
 
             ByteBuffer filtered = ByteBuffer.allocate(2048);
             builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
filtered,
-                    Integer.MAX_VALUE);
+                    Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
 
             filtered.flip();
             MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -292,7 +292,7 @@ public class MemoryRecordsTest {
                 protected boolean shouldRetain(RecordBatch recordBatch, Record record) {
                     return true;
                 }
-            }, filtered, Integer.MAX_VALUE);
+            }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
 
             filtered.flip();
             MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -320,7 +320,7 @@ public class MemoryRecordsTest {
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
         MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new
RetainNonNullKeysFilter(),
-                filtered, Integer.MAX_VALUE);
+                filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
@@ -388,7 +388,7 @@ public class MemoryRecordsTest {
 
             ByteBuffer filtered = ByteBuffer.allocate(2048);
             MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0),
new RetainNonNullKeysFilter(),
-                    filtered, Integer.MAX_VALUE);
+                    filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
 
             filtered.flip();
             MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -476,7 +476,8 @@ public class MemoryRecordsTest {
             output.rewind();
 
             MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer)
-                    .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
output, Integer.MAX_VALUE);
+                    .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
output, Integer.MAX_VALUE,
+                              BufferSupplier.NO_CACHING);
 
             buffer.position(buffer.position() + result.bytesRead);
             result.output.flip();
@@ -519,7 +520,8 @@ public class MemoryRecordsTest {
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
         MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(
-                new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE);
+                new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE,
+                BufferSupplier.NO_CACHING);
 
         filtered.flip();
 
@@ -632,7 +634,7 @@ public class MemoryRecordsTest {
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
         MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new
RetainNonNullKeysFilter(),
-                filtered, Integer.MAX_VALUE);
+                filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
 
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c786c58/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index b05e37f..5aa8672 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -322,6 +322,8 @@ private[log] class Cleaner(val id: Int,
   /* buffer used for write i/o */
   private var writeBuffer = ByteBuffer.allocate(ioBufferSize)
 
+  private val decompressionBufferSupplier = BufferSupplier.create();
+
   require(offsetMap.slots * dupBufferLoadFactor > 1, "offset map is too small to fit in
even a single message, so log cleaning will never make progress. You can increase log.cleaner.dedupe.buffer.size
or decrease log.cleaner.threads")
 
   /**
@@ -516,7 +518,7 @@ private[log] class Cleaner(val id: Int,
       source.log.readInto(readBuffer, position)
       val records = MemoryRecords.readableRecords(readBuffer)
       throttler.maybeThrottle(records.sizeInBytes)
-      val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize)
+      val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize,
decompressionBufferSupplier)
       stats.readMessages(result.messagesRead, result.bytesRead)
       stats.recopyMessages(result.messagesRetained, result.bytesRetained)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c786c58/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 52e9140..6fcc7ae 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -224,7 +224,7 @@ class LogTest {
     val filtered = ByteBuffer.allocate(2048)
     records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
       override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
-    }, filtered, Int.MaxValue)
+    }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
     filtered.flip()
     val filteredRecords = MemoryRecords.readableRecords(filtered)
 
@@ -268,7 +268,7 @@ class LogTest {
     val filtered = ByteBuffer.allocate(2048)
     records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
       override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
-    }, filtered, Int.MaxValue)
+    }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
     filtered.flip()
     val filteredRecords = MemoryRecords.readableRecords(filtered)
 


Mime
View raw message