kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4741; Fix potential buffer leak in RecordAccumulator in case of exception
Date Wed, 08 Feb 2017 13:38:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2e662a061 -> effa19c04


KAFKA-4741; Fix potential buffer leak in RecordAccumulator in case of exception

Author: Satish Duggana <sduggana@hortonworks.com>

Reviewers: Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2509 from satishd/buffer-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/effa19c0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/effa19c0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/effa19c0

Branch: refs/heads/trunk
Commit: effa19c045b15c0f82568e91d7f485ae564123a2
Parents: 2e662a0
Author: Satish Duggana <sduggana@hortonworks.com>
Authored: Wed Feb 8 12:58:43 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Feb 8 13:38:05 2017 +0000

----------------------------------------------------------------------
 .../clients/producer/internals/RecordAccumulator.java | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/effa19c0/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d3ae89e..800a857 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -12,8 +12,6 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.util.Iterator;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
@@ -44,6 +42,7 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -166,6 +165,7 @@ public final class RecordAccumulator {
         // We keep track of the number of appending thread to make sure we do not miss batches
in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
+        ByteBuffer buffer = null;
         try {
             // check if we have an in-progress batch
             Deque<RecordBatch> dq = getOrCreateDeque(tp);
@@ -180,7 +180,7 @@ public final class RecordAccumulator {
             // we don't have an in-progress record batch try to allocate a new batch
             int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key,
value));
             log.trace("Allocating a new {} byte message buffer for topic {} partition {}",
size, tp.topic(), tp.partition());
-            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
+            buffer = free.allocate(size, maxTimeToBlock);
             synchronized (dq) {
                 // Need to check if producer is closed again after grabbing the dequeue lock.
                 if (closed)
@@ -189,18 +189,24 @@ public final class RecordAccumulator {
                 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback,
dq);
                 if (appendResult != null) {
                     // Somebody else found us a batch, return the one we waited for! Hopefully
this doesn't happen often...
-                    free.deallocate(buffer);
                     return appendResult;
                 }
+
                 MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression,
TimestampType.CREATE_TIME, this.batchSize);
                 RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());
                 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key,
value, callback, time.milliseconds()));
 
                 dq.addLast(batch);
                 incomplete.add(batch);
+
+                // Don't deallocate this buffer in the finally block as it's being used in
the record batch
+                buffer = null;
+                
                 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(),
true);
             }
         } finally {
+            if (buffer != null)
+                free.deallocate(buffer);
             appendsInProgress.decrementAndGet();
         }
     }


Mime
View raw message