kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3713; Close `compressor` to fix memory leak
Date Sat, 14 May 2016 04:03:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 e0c741d4e -> bca95cba9


KAFKA-3713; Close `compressor` to fix memory leak

This fixes test_producer_throughput with compression_type=snappy.

Also: added heap dump on out of memory error to `producer_performance.py` and corrected the
upgrade note related to the change in buffer size for compression streams.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes #1385 from ijuma/kafka-3713-test_producer_throughput-snappy-fail and squashes the following
commits:

54c7962 [Ismael Juma] Correct upgrade note about buffer size for compression stream
515040b [Ismael Juma] Call `compressor.close()` to fix memory leak
5311e5b [Ismael Juma] Dump heap on out of memory error when running `producer_performance.py`

(cherry picked from commit 13130139ff70d0127e87d2c87dd5e62e6320fa45)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bca95cba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bca95cba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bca95cba

Branch: refs/heads/0.10.0
Commit: bca95cba96212dc3460d0a87fd4dceb6ce322ccc
Parents: e0c741d
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri May 13 21:03:35 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri May 13 21:03:51 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/kafka/common/record/Record.java   | 6 +++++-
 docs/upgrade.html                                              | 2 +-
 tests/kafkatest/services/performance/producer_performance.py   | 2 +-
 3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bca95cba/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index baab9ab..77e4f68 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -147,7 +147,11 @@ public final class Record {
         // construct the compressor with compression type none since this function will not
do any
         //compression according to the input type, it will just write the record's payload
as is
         Compressor compressor = new Compressor(buffer, CompressionType.NONE);
-        compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize);
+        try {
+            compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize);
+        } finally {
+            compressor.close();
+        }
     }
 
     public static void write(Compressor compressor, long crc, byte attributes, long timestamp,
byte[] key, byte[] value, int valueOffset, int valueSize) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bca95cba/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 3e07ef8..a6754bc 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -91,7 +91,7 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded
to 0.9
 
 <ul>
     <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b>
is available for stream processing on data stored in Kafka topics. This new client library
only works with 0.10.x and upward versioned brokers due to message format changes mentioned
above. For more information please read <a href="#streams_overview">this section</a>.</li>
-    <li> If compression with snappy or gzip is enabled, the new producer will use the
compression scheme's default buffer size (this is already the case for LZ4) instead of 1 KB
in order to improve the compression ratio. Note that the default buffer sizes for gzip, snappy
and LZ4 are 0.5 KB, 32 KB and 64KB respectively. For the snappy case, a producer with 5000
partitions will require an additional 155 MB of JVM heap.</li>
+    <li> If compression with snappy or gzip is enabled, the new producer will use the
compression scheme's default buffer size (this is already the case for LZ4) instead of 1 KB
in order to improve the compression ratio. Note that the default buffer sizes for gzip, snappy
and LZ4 are 0.5 KB, 2x32 KB and 2x64KB respectively. For the snappy case, a producer with
5000 partitions will require an additional 315 MB of JVM heap.</li>
     <li> The default value of the configuration parameter <code>receive.buffer.bytes</code>
is now 64K for the new consumer.</li>
     <li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code>
to restrict internal topics (such as the consumer offsets topic) from accidentally being included
in regular expression subscriptions. By default, it is enabled.</li>
     <li> The old Scala producer has been deprecated. Users should migrate their code
to the Java producer included in the kafka-clients JAR as soon as possible. </li>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bca95cba/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 7131df1..d66efec 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -100,7 +100,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             cmd += "export CLASSPATH; "
 
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG
-        cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance
" \
+        cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\"
%(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \
               "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d
--throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s"
% args
 
         self.security_config.setup_node(node)


Mime
View raw message