kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: KAFKA-1488; new metrics for measuring the ratio when the new producer is block for space allocation; patched by Jun Rao; reviewed by Guozhang Wang and Joel Koshy
Date Wed, 11 Jun 2014 00:14:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bb0184757 -> dcc88408c


KAFKA-1488; new metrics for measuring the ratio when the new producer is block for space allocation;
patched by Jun Rao; reviewed by Guozhang Wang and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dcc88408
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dcc88408
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dcc88408

Branch: refs/heads/trunk
Commit: dcc88408c98a07cb9a816ab55cd81e55f1d2217d
Parents: bb01847
Author: Jun Rao <junrao@gmail.com>
Authored: Tue Jun 10 17:14:45 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jun 10 17:14:45 2014 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/BufferPool.java  | 31 ++++++++++++++++----
 .../producer/internals/RecordAccumulator.java   |  2 +-
 .../kafka/clients/producer/BufferPoolTest.java  | 26 ++++++++--------
 3 files changed, 40 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dcc88408/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index d1d6c4b..169a656 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -16,14 +16,19 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.Time;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.kafka.clients.producer.BufferExhaustedException;
-
 
 /**
  * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to
the needs of the producer. In
@@ -44,6 +49,9 @@ public final class BufferPool {
     private final Deque<ByteBuffer> free;
     private final Deque<Condition> waiters;
     private long availableMemory;
+    private final Metrics metrics;
+    private final Time time;
+    private final Sensor waitTime;
 
     /**
      * Create a new buffer pool
@@ -54,7 +62,7 @@ public final class BufferPool {
      *        {@link #allocate(int)} call will block and wait for memory to be returned to
the pool. If false
      *        {@link #allocate(int)} will throw an exception if the buffer is out of memory.
      */
-    public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion) {
+    public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics,
Time time) {
         this.poolableSize = poolableSize;
         this.blockOnExhaustion = blockOnExhaustion;
         this.lock = new ReentrantLock();
@@ -62,7 +70,13 @@ public final class BufferPool {
         this.waiters = new ArrayDeque<Condition>();
         this.totalMemory = memory;
         this.availableMemory = memory;
-    }
+        this.metrics = metrics;
+        this.time = time;
+        this.waitTime = this.metrics.sensor("bufferpool-wait-time");
+        this.waitTime.add("bufferpool-wait-ratio",
+                          "The fraction of time an appender waits for space allocation.",
+                          new Rate(TimeUnit.NANOSECONDS));
+   }
 
     /**
      * Allocate a buffer of the given size. This method blocks if there is not enough memory
and the buffer pool
@@ -111,7 +125,14 @@ public final class BufferPool {
                 // loop over and over until we have a buffer or have reserved
                 // enough memory to allocate one
                 while (accumulated < size) {
-                    moreMemory.await();
+                    try {
+                        long startWait = time.nanoseconds();
+                        moreMemory.await(300, TimeUnit.MILLISECONDS);
+                        long endWait = time.nanoseconds();
+                        this.waitTime.record(endWait - startWait, time.milliseconds());
+                     } catch (InterruptedException e) {
+                        // This should never happen. Just let it go.
+                    }
                     // check if we can satisfy this request from the free list,
                     // otherwise allocate memory
                     if (accumulated == 0 && size == this.poolableSize &&
!this.free.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dcc88408/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5ededcc..4010d42 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -82,7 +82,7 @@ public final class RecordAccumulator {
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
-        this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull);
+        this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time);
         this.time = time;
         registerMetrics(metrics);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dcc88408/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
index f227b5c..fe3c13f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.clients.producer;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -26,13 +28,11 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
-import org.apache.kafka.clients.producer.BufferExhaustedException;
-import org.apache.kafka.clients.producer.internals.BufferPool;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
+import static org.junit.Assert.*;
 
 public class BufferPoolTest {
+    private MockTime time = new MockTime();
+    private Metrics metrics = new Metrics(time);
 
     /**
      * Test the simple non-blocking allocation paths
@@ -41,7 +41,7 @@ public class BufferPoolTest {
     public void testSimple() throws Exception {
         int totalMemory = 64 * 1024;
         int size = 1024;
-        BufferPool pool = new BufferPool(totalMemory, size, false);
+        BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time);
         ByteBuffer buffer = pool.allocate(size);
         assertEquals("Buffer size should equal requested size.", size, buffer.limit());
         assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
@@ -68,7 +68,7 @@ public class BufferPoolTest {
      */
     @Test(expected = IllegalArgumentException.class)
     public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
-        BufferPool pool = new BufferPool(1024, 512, true);
+        BufferPool pool = new BufferPool(1024, 512, true, metrics, time);
         ByteBuffer buffer = pool.allocate(1024);
         assertEquals(1024, buffer.limit());
         pool.deallocate(buffer);
@@ -77,7 +77,7 @@ public class BufferPoolTest {
 
     @Test
     public void testNonblockingMode() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, false);
+        BufferPool pool = new BufferPool(2, 1, false, metrics, time);
         pool.allocate(1);
         try {
             pool.allocate(2);
@@ -92,7 +92,7 @@ public class BufferPoolTest {
      */
     @Test
     public void testDelayedAllocation() throws Exception {
-        BufferPool pool = new BufferPool(5 * 1024, 1024, true);
+        BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time);
         ByteBuffer buffer = pool.allocate(1024);
         CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
         CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
@@ -141,7 +141,7 @@ public class BufferPoolTest {
         final int iterations = 50000;
         final int poolableSize = 1024;
         final int totalMemory = numThreads / 2 * poolableSize;
-        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true);
+        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics,
time);
         List<StressTestThread> threads = new ArrayList<StressTestThread>();
         for (int i = 0; i < numThreads; i++)
             threads.add(new StressTestThread(pool, iterations));


Mime
View raw message