kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5236; Increase the block/buffer size when compressing with Snappy or Gzip
Date Fri, 02 Jun 2017 20:20:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 f9de919c1 -> bf8378a84


KAFKA-5236; Increase the block/buffer size when compressing with Snappy or Gzip

We had originally increased Snappy’s block size as part of KAFKA-3704. However,
we had some issues with excessive memory usage in the producer and we reverted
it in 7c6ee8d5e.

After more investigation, we fixed the underlying reason why memory usage seemed
to grow much more than expected via KAFKA-3747 (included in 0.10.0.1).

In 0.10.2, we changed the broker to use the same classes as the producer and the
broker’s block size for Snappy was changed from 32 KB to 1KB. As reported in
KAFKA-5236, the on disk size is, in some cases, 50% larger when the data is compressed
with 1 KB instead of 32 KB as the block size.

As discussed in KAFKA-3704, it may be worth making this configurable and/or allocate
the compression buffers from the producer pool. However, for 0.11.0.0, I think the
simplest thing to do is to default to 32 KB for Snappy (the default if no block size
is provided).

I also increased the Gzip buffer size. 1 KB is too small and the default is smaller
still (512 bytes). 8 KB (which is the default buffer size for BufferedOutputStream)
seemed like a reasonable default.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3205 from ijuma/kafka-5236-snappy-block-size

(cherry picked from commit 4959444afc927026f48f5c7d9babed7b9f1bea50)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf8378a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf8378a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf8378a8

Branch: refs/heads/0.11.0
Commit: bf8378a84fbe874b1091555e58e0a13ad874cda5
Parents: f9de919
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Jun 2 21:20:02 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jun 2 21:20:23 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/record/CompressionType.java       | 17 +++++++++--------
 .../kafka/common/record/MemoryRecordsBuilder.java  |  4 +---
 .../kafka/common/record/CompressionTypeTest.java   |  4 ++--
 .../common/record/SimpleLegacyRecordTest.java      |  2 +-
 .../kafka/message/MessageCompressionTest.scala     | 12 ++++++------
 docs/upgrade.html                                  |  6 ++++++
 6 files changed, 25 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bf8378a8/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 742493b..16d6e01 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -35,7 +35,7 @@ import java.util.zip.GZIPOutputStream;
 public enum CompressionType {
     NONE(0, "none", 1.0f) {
         @Override
-        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize) {
+        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion)
{
             return buffer;
         }
 
@@ -47,9 +47,10 @@ public enum CompressionType {
 
     GZIP(1, "gzip", 1.0f) {
         @Override
-        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize) {
+        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion)
{
             try {
-                return new GZIPOutputStream(buffer, bufferSize);
+                // GZIPOutputStream has a default buffer size of 512 bytes, which is too
small
+                return new GZIPOutputStream(buffer, 8 * 1024);
             } catch (Exception e) {
                 throw new KafkaException(e);
             }
@@ -67,9 +68,9 @@ public enum CompressionType {
 
     SNAPPY(2, "snappy", 1.0f) {
         @Override
-        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize) {
+        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion)
{
             try {
-                return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer, bufferSize);
+                return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
@@ -87,7 +88,7 @@ public enum CompressionType {
 
     LZ4(3, "lz4", 1.0f) {
         @Override
-        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize) {
+        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion)
{
             try {
                 return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0);
             } catch (Throwable e) {
@@ -124,7 +125,7 @@ public enum CompressionType {
      * write to the underlying buffer in the given {@link ByteBufferOutputStream} after the
compressed data has been written.
      * In the event that the buffer needs to be expanded while writing the data, access to
the underlying buffer needs to be preserved.
      */
-    public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte
messageVersion, int bufferSize);
+    public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte
messageVersion);
 
     /**
      * Wrap buffer with an InputStream that will decompress data with this CompressionType.
@@ -178,7 +179,7 @@ public enum CompressionType {
         static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
                 MethodType.methodType(void.class, InputStream.class));
         static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream",
-                MethodType.methodType(void.class, OutputStream.class, Integer.TYPE));
+                MethodType.methodType(void.class, OutputStream.class));
     }
 
     private static MethodHandle findConstructor(String className, MethodType methodType)
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/bf8378a8/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 89d314d..cd9ba0e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -38,7 +38,6 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
  */
 public class MemoryRecordsBuilder {
     private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
-    private static final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
 
     private final TimestampType timestampType;
     private final CompressionType compressionType;
@@ -124,8 +123,7 @@ public class MemoryRecordsBuilder {
         }
 
         this.bufferStream = bufferStream;
-        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream,
magic,
-                COMPRESSION_DEFAULT_BUFFER_SIZE));
+        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream,
magic));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/bf8378a8/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
index fe196c8..d76a577 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
@@ -30,7 +30,7 @@ public class CompressionTypeTest {
     public void testLZ4FramingMagicV0() {
         ByteBuffer buffer = ByteBuffer.allocate(256);
         KafkaLZ4BlockOutputStream out = (KafkaLZ4BlockOutputStream) CompressionType.LZ4.wrapForOutput(
-                new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V0, 256);
+                new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V0);
         assertTrue(out.useBrokenFlagDescriptorChecksum());
 
         buffer.rewind();
@@ -44,7 +44,7 @@ public class CompressionTypeTest {
     public void testLZ4FramingMagicV1() {
         ByteBuffer buffer = ByteBuffer.allocate(256);
         KafkaLZ4BlockOutputStream out = (KafkaLZ4BlockOutputStream) CompressionType.LZ4.wrapForOutput(
-                new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V1, 256);
+                new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V1);
         assertFalse(out.useBrokenFlagDescriptorChecksum());
 
         buffer.rewind();

http://git-wip-us.apache.org/repos/asf/kafka/blob/bf8378a8/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
index b409af6..5f578a8 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
@@ -48,7 +48,7 @@ public class SimpleLegacyRecordTest {
     public void testCompressedIterationWithEmptyRecords() throws Exception {
         ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64);
         OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new ByteBufferOutputStream(emptyCompressedValue),
-                RecordBatch.MAGIC_VALUE_V1, 64);
+                RecordBatch.MAGIC_VALUE_V1);
         gzipOutput.close();
         emptyCompressedValue.flip();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bf8378a8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index b0913db..0e74f04 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -28,11 +28,11 @@ class MessageCompressionTest extends JUnitSuite {
   @Test
   def testSimpleCompressDecompress() {
     val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec)
-    if(isSnappyAvailable)
+    if (isSnappyAvailable)
       codecs += SnappyCompressionCodec
-    if(isLZ4Available)
+    if (isLZ4Available)
       codecs += LZ4CompressionCodec
-    for(codec <- codecs)
+    for (codec <- codecs)
       testSimpleCompressDecompress(codec)
   }
 
@@ -48,10 +48,10 @@ class MessageCompressionTest extends JUnitSuite {
 
     testCompressSize(GZIPCompressionCodec, messages, 396)
 
-    if(isSnappyAvailable)
-      testCompressSize(SnappyCompressionCodec, messages, 1063)
+    if (isSnappyAvailable)
+      testCompressSize(SnappyCompressionCodec, messages, 502)
 
-    if(isLZ4Available)
+    if (isLZ4Available)
       testCompressSize(LZ4CompressionCodec, messages, 387)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bf8378a8/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index aae058b..ca905ae 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -60,6 +60,12 @@
     <li>The <code>offsets.topic.replication.factor</code> broker config
is now enforced upon auto topic creation. Internal
         auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until
the cluster size meets this
         replication factor requirement.</li>
+    <li> When compressing data with snappy, the producer and broker will use the compression
scheme's default block size (2 x 32 KB)
+         instead of 1 KB in order to improve the compression ratio. There have been reports
of data compressed with the smaller
+         block size being 50% larger than when compressed with the larger block size. For
the snappy case, a producer with 5000
+         partitions will require an additional 315 MB of JVM heap.</li>
+    <li> Similarly, when compressing data with gzip, the producer and broker will use
8 KB instead of 1 KB as the buffer size. The default
+         for gzip is excessively low (512 bytes). </li>
     <li>The broker configuration <code>max.message.bytes</code> now applies
to the total size of a batch of messages.
         Previously the setting applied to batches of compressed messages, or to non-compressed
messages individually. In practice,
         the change is minor since a message batch may consist of only a single message, so
the limitation on the size of


Mime
View raw message