kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3704: Remove hard-coded block size in KafkaProducer
Date Thu, 12 May 2016 00:01:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 dca78b586 -> 05640c86e


KAFKA-3704: Remove hard-coded block size in KafkaProducer

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ismael Juma

Closes #1371 from guozhangwang/K3565-remove-compression-blocksize

(cherry picked from commit 1182d61deb23b5cd86cbe462471f7df583a796e1)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05640c86
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05640c86
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05640c86

Branch: refs/heads/0.10.0
Commit: 05640c86eebc673f110604e0c7b3fa3315dfcd7b
Parents: dca78b5
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed May 11 17:01:14 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 11 17:01:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/record/Compressor.java | 17 ++++++-----------
 .../org/apache/kafka/common/record/Record.java     |  2 +-
 docs/upgrade.html                                  |  1 +
 3 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05640c86/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 37d53b8..60c15e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -33,7 +33,6 @@ public class Compressor {
 
     static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
     static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
-    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
 
     private static final float[] TYPE_TO_RATE;
 
@@ -53,7 +52,7 @@ public class Compressor {
         @Override
         public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
             return Class.forName("org.xerial.snappy.SnappyOutputStream")
-                .getConstructor(OutputStream.class, Integer.TYPE);
+                .getConstructor(OutputStream.class);
         }
     });
 
@@ -91,7 +90,7 @@ public class Compressor {
     public float compressionRate;
     public long maxTimestamp;
 
-    public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) {
+    public Compressor(ByteBuffer buffer, CompressionType type) {
         this.type = type;
         this.initPos = buffer.position();
 
@@ -108,11 +107,7 @@ public class Compressor {
 
         // create the stream
         bufferStream = new ByteBufferOutputStream(buffer);
-        appendStream = wrapForOutput(bufferStream, type, blockSize);
-    }
-
-    public Compressor(ByteBuffer buffer, CompressionType type) {
-        this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
+        appendStream = wrapForOutput(bufferStream, type);
     }
 
     public ByteBuffer buffer() {
@@ -246,16 +241,16 @@ public class Compressor {
 
     // the following two functions also need to be public since they are used in MemoryRecords.iteration
 
-    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType
type, int bufferSize) {
+    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType
type) {
         try {
             switch (type) {
                 case NONE:
                     return new DataOutputStream(buffer);
                 case GZIP:
-                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
+                    return new DataOutputStream(new GZIPOutputStream(buffer));
                 case SNAPPY:
                     try {
-                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer,
bufferSize);
+                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/05640c86/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 147ad86..baab9ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -146,7 +146,7 @@ public final class Record {
     public static void write(ByteBuffer buffer, long timestamp, byte[] key, byte[] value,
CompressionType type, int valueOffset, int valueSize) {
         // construct the compressor with compression type none since this function will not
do any
         //compression according to the input type, it will just write the record's payload
as is
-        Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
+        Compressor compressor = new Compressor(buffer, CompressionType.NONE);
         compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/05640c86/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4b8ec7e..3c98540 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -91,6 +91,7 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded
to 0.9
 
 <ul>
     <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b>
is available for stream processing on data stored in Kafka topics. This new client library
only works with 0.10.x and upward versioned brokers due to message format changes mentioned
above. For more information please read <a href="#streams_overview">this section</a>.</li>
+    <li> If compression with snappy or gzip is enabled, the new producer will use the
compression scheme's default buffer size (this is already the case for LZ4) instead of 1 KB
in order to improve the compression ratio. Note that the default buffer sizes for snappy,
gzip and LZ4 are 0.5 KB, 32 KB and 64KB respectively. For the snappy case, a producer with
5000 partitions will require an additional 155 MB of JVM heap.</li>
     <li> The default value of the configuration parameter <code>receive.buffer.bytes</code>
is now 64K for the new consumer.</li>
     <li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code>
to restrict internal topics (such as the consumer offsets topic) from accidentally being included
in regular expression subscriptions. By default, it is enabled.</li>
     <li> The old Scala producer has been deprecated. Users should migrate their code
to the Java producer included in the kafka-clients JAR as soon as possible. </li>


Mime
View raw message