kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9449; Adds support for closing the producer's BufferPool. (#7967)
Date Fri, 17 Jan 2020 22:33:01 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new c8d8dc9  KAFKA-9449; Adds support for closing the producer's BufferPool. (#7967)
c8d8dc9 is described below

commit c8d8dc9d1e298bd0f06dee7f4c8a78c00a247d6e
Author: Brian Byrne <bbyrne@confluent.io>
AuthorDate: Fri Jan 17 14:27:31 2020 -0800

    KAFKA-9449; Adds support for closing the producer's BufferPool. (#7967)
    
    The producer's BufferPool may block allocations if its memory limit has hit capacity.
If the producer is closed, it's possible for the allocation waiters to wait for max.block.ms
if progress cannot be made, even when force-closed (immediate), which can cause indefinite
blocking if max.block.ms is particularly high.
    
    This patch fixes the problem by adding a `close()` method to `BufferPool`, which wakes
up any waiters that have pending allocations and throws an exception.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../clients/producer/internals/BufferPool.java     | 27 ++++++++++++
 .../producer/internals/RecordAccumulator.java      |  1 +
 .../clients/producer/internals/BufferPoolTest.java | 51 +++++++++++++++++++++-
 3 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 22d7472..b49a7e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
@@ -55,6 +56,7 @@ public class BufferPool {
     private final Metrics metrics;
     private final Time time;
     private final Sensor waitTime;
+    private boolean closed;
 
     /**
      * Create a new buffer pool
@@ -82,6 +84,7 @@ public class BufferPool {
                                                    metricGrpName,
                                                    "The total time an appender waits for
space allocation.");
         this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
+        this.closed = false;
     }
 
     /**
@@ -104,6 +107,12 @@ public class BufferPool {
 
         ByteBuffer buffer = null;
         this.lock.lock();
+
+        if (this.closed) {
+            this.lock.unlock();
+            throw new KafkaException("Producer closed while allocating memory");
+        }
+
         try {
             // check if we have a free buffer of the right size pooled
             if (size == poolableSize && !this.free.isEmpty())
@@ -138,6 +147,9 @@ public class BufferPool {
                             recordWaitTime(timeNs);
                         }
 
+                        if (this.closed)
+                            throw new KafkaException("Producer closed while allocating memory");
+
                         if (waitingTimeElapsed) {
                             throw new TimeoutException("Failed to allocate memory within
the configured max blocking time " + maxTimeToBlockMs + " ms.");
                         }
@@ -316,4 +328,19 @@ public class BufferPool {
     Deque<Condition> waiters() {
         return this.waiters;
     }
+
+    /**
+     * Closes the buffer pool. Memory will be prevented from being allocated, but may be
deallocated. All allocations
+     * awaiting available memory will be notified to abort.
+     */
+    public void close() {
+        this.lock.lock();
+        this.closed = true;
+        try {
+            for (Condition waiter : this.waiters)
+                waiter.signal();
+        } finally {
+            this.lock.unlock();
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 745382d..d002fc4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -796,6 +796,7 @@ public final class RecordAccumulator {
      */
     public void close() {
         this.closed = true;
+        this.free.close();
     }
 
     /*
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 8e44fa1..724d9d4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
@@ -28,7 +29,10 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +40,7 @@ import java.util.concurrent.locks.Condition;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -232,7 +237,7 @@ public class BufferPoolTest {
         // both the allocate() called by threads t1 and t2 should have been interrupted and
the waiters queue should be empty
         assertEquals(pool.queued(), 0);
     }
-    
+
     @Test
     public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception {
         BufferPool bufferPool = spy(new BufferPool(2, 1, new Metrics(), time, metricGroup));
@@ -377,4 +382,48 @@ public class BufferPoolTest {
         }
     }
 
+    @Test
+    public void testCloseAllocations() throws Exception {
+        BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM, metricGroup);
+        ByteBuffer buffer = pool.allocate(1, maxBlockTimeMs);
+
+        // Close the buffer pool. This should prevent any further allocations.
+        pool.close();
+
+        assertThrows(KafkaException.class, () -> pool.allocate(1, maxBlockTimeMs));
+
+        // Ensure deallocation still works.
+        pool.deallocate(buffer);
+    }
+
+    @Test
+    public void testCloseNotifyWaiters() throws Exception {
+        final int numWorkers = 2;
+
+        BufferPool pool = new BufferPool(1, 1, metrics, Time.SYSTEM, metricGroup);
+        ByteBuffer buffer = pool.allocate(1, Long.MAX_VALUE);
+
+        CountDownLatch completed = new CountDownLatch(numWorkers);
+        ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
+        Callable<Void> work = new Callable<Void>() {
+                public Void call() throws Exception {
+                    assertThrows(KafkaException.class, () -> pool.allocate(1, Long.MAX_VALUE));
+                    completed.countDown();
+                    return null;
+                }
+            };
+        for (int i = 0; i < numWorkers; ++i) {
+            executor.submit(work);
+        }
+
+        assertEquals("Allocation shouldn't have happened yet, waiting on memory", numWorkers,
completed.getCount());
+
+        // Close the buffer pool. This should notify all waiters.
+        pool.close();
+
+        completed.await(15, TimeUnit.SECONDS);
+
+        pool.deallocate(buffer);
+    }
+
 }


Mime
View raw message