kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3704; Revert "Remove hard-coded block size in KafkaProducer"
Date Mon, 16 May 2016 19:00:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 61f4c8b09 -> 9a3fcf413


KAFKA-3704; Revert "Remove hard-coded block size in KafkaProducer"

This is not an exact revert as the code changed a bit since the
original commit. We also include a note in `upgrade.html`.

The original commit is 1182d61deb23b5cd86cbe462471f7df583a796e1.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira, Guozhang Wang

Closes #1391 from ijuma/kafka-3704-revert and squashes the following commits:

7891b67 [Ismael Juma] Tweak upgrade note based on Gwen's feedback
1673cd0 [Ismael Juma] Revert "KAFKA-3704: Remove hard-coded block size in KafkaProducer"


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9a3fcf41
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9a3fcf41
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9a3fcf41

Branch: refs/heads/trunk
Commit: 9a3fcf41350ddda9a41db18cde718f892b95177c
Parents: 61f4c8b
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon May 16 12:00:06 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon May 16 12:00:06 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/record/Compressor.java   | 11 ++++++-----
 docs/upgrade.html                                    | 15 +++++++++++----
 2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9a3fcf41/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 60c15e6..e23a52e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -33,6 +33,7 @@ public class Compressor {
 
     static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
     static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
 
     private static final float[] TYPE_TO_RATE;
 
@@ -52,7 +53,7 @@ public class Compressor {
         @Override
         public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
             return Class.forName("org.xerial.snappy.SnappyOutputStream")
-                .getConstructor(OutputStream.class);
+                .getConstructor(OutputStream.class, Integer.TYPE);
         }
     });
 
@@ -107,7 +108,7 @@ public class Compressor {
 
         // create the stream
         bufferStream = new ByteBufferOutputStream(buffer);
-        appendStream = wrapForOutput(bufferStream, type);
+        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
     }
 
     public ByteBuffer buffer() {
@@ -241,16 +242,16 @@ public class Compressor {
 
     // the following two functions also need to be public since they are used in MemoryRecords.iteration
 
-    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType
type) {
+    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType
type, int bufferSize) {
         try {
             switch (type) {
                 case NONE:
                     return new DataOutputStream(buffer);
                 case GZIP:
-                    return new DataOutputStream(new GZIPOutputStream(buffer));
+                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                 case SNAPPY:
                     try {
-                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer);
+                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer,
bufferSize);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a3fcf41/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 2972f26..d09b9d7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,10 +68,18 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded
to 0.9
     message throughput degradation because of the increased overhead.
     Likewise, replication now transmits an additional 8 bytes per message.
     If you're running close to the network capacity of your cluster, it's possible that you'll
overwhelm the network cards
-    and see failures and performance issues due to the overload. When receiving compressed
messages, 0.10.0
+    and see failures and performance issues due to the overload.
+</p>
+    <b>Note:</b> If you have enabled compression on producers, you may notice
reduced producer throughput and/or
+    lower compression rate on the broker in some cases. When receiving compressed messages,
0.10.0
     brokers avoid recompressing the messages, which in general reduces the latency and improves
the throughput. In
-    certain cases, this may reduce the batching size on the producer, which could lead to
worse throughput. If this
-    happens, users can tune linger.ms and batch.size of the producer for better throughput.
+    certain cases, however, this may reduce the batching size on the producer, which could
lead to worse throughput. If this
+    happens, users can tune linger.ms and batch.size of the producer for better throughput.
In addition, the producer buffer
+    used for compressing messages with snappy is smaller than the one used by the broker,
which may have a negative
+    impact on the compression ratio for the messages on disk. We intend to make this configurable
in a future Kafka
+    release.
+<p>
+
 </p>
 
 <h5><a id="upgrade_10_breaking" href="#upgrade_10_breaking">Potential breaking
changes in 0.10.0.0</a></h5>
@@ -101,7 +109,6 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded
to 0.9
 
 <ul>
     <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b>
is available for stream processing on data stored in Kafka topics. This new client library
only works with 0.10.x and upward versioned brokers due to message format changes mentioned
above. For more information please read <a href="#streams_overview">this section</a>.</li>
-    <li> If compression with snappy or gzip is enabled, the new producer will use the
compression scheme's default buffer size (this is already the case for LZ4) instead of 1 KB
in order to improve the compression ratio. Note that the default buffer sizes for gzip, snappy
and LZ4 are 0.5 KB, 2x32 KB and 2x64KB respectively. For the snappy case, a producer with
5000 partitions will require an additional 315 MB of JVM heap.</li>
     <li> The default value of the configuration parameter <code>receive.buffer.bytes</code>
is now 64K for the new consumer.</li>
     <li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code>
to restrict internal topics (such as the consumer offsets topic) from accidentally being included
in regular expression subscriptions. By default, it is enabled.</li>
     <li> The old Scala producer has been deprecated. Users should migrate their code
to the Java producer included in the kafka-clients JAR as soon as possible. </li>


Mime
View raw message