kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [2/2] kafka git commit: KAFKA-2120; Add a request timeout to NetworkClient (KIP-19); reviewed by Jason Gustafson, Ismael Juma, Joel Koshy, Jun Rao, and Edward Ribeiro
Date Wed, 16 Sep 2015 17:40:02 GMT
KAFKA-2120; Add a request timeout to NetworkClient (KIP-19); reviewed by Jason Gustafson, Ismael Juma, Joel Koshy, Jun Rao, and Edward Ribeiro


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da39931a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da39931a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da39931a

Branch: refs/heads/trunk
Commit: da39931afad8008bc2b385a75a462777be051435
Parents: b658e25
Author: Mayuresh Gharat <gharatmayuresh15@gmail.com>
Authored: Wed Sep 16 10:31:57 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Sep 16 10:31:57 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientRequest.java |  8 ++
 .../kafka/clients/CommonClientConfigs.java      |  6 ++
 .../apache/kafka/clients/InFlightRequests.java  | 26 +++++-
 .../org/apache/kafka/clients/KafkaClient.java   |  3 +-
 .../org/apache/kafka/clients/NetworkClient.java | 73 +++++++++++++----
 .../kafka/clients/consumer/ConsumerConfig.java  | 10 +++
 .../kafka/clients/consumer/KafkaConsumer.java   |  3 +-
 .../internals/ConsumerNetworkClient.java        |  2 +-
 .../kafka/clients/producer/KafkaProducer.java   | 83 +++++++++++++++++---
 .../kafka/clients/producer/ProducerConfig.java  | 39 ++++++++-
 .../clients/producer/internals/BufferPool.java  | 20 ++---
 .../producer/internals/RecordAccumulator.java   | 57 +++++++++++---
 .../clients/producer/internals/RecordBatch.java | 38 ++++++++-
 .../clients/producer/internals/Sender.java      | 20 +++--
 .../apache/kafka/common/network/Selectable.java |  5 --
 .../apache/kafka/common/network/Selector.java   | 11 ---
 .../org/apache/kafka/clients/MockClient.java    |  3 +-
 .../apache/kafka/clients/NetworkClientTest.java | 48 +++++------
 .../producer/internals/BufferPoolTest.java      | 57 ++++++++------
 .../internals/RecordAccumulatorTest.java        | 67 ++++++++++------
 .../clients/producer/internals/SenderTest.java  | 21 ++---
 .../kafka/common/network/SSLSelectorTest.java   | 19 -----
 .../kafka/common/network/SelectorTest.java      | 17 ----
 .../org/apache/kafka/test/MockSelector.java     |  6 +-
 .../controller/ControllerChannelManager.scala   |  3 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  5 ++
 .../main/scala/kafka/server/KafkaServer.scala   |  3 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  3 +-
 .../scala/kafka/tools/ProducerPerformance.scala |  2 -
 .../kafka/utils/NetworkClientBlockingOps.scala  |  2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala | 46 +----------
 .../unit/kafka/server/KafkaConfigTest.scala     |  1 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |  1 -
 33 files changed, 457 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index dc8f0f1..6410f09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -24,6 +24,7 @@ public final class ClientRequest {
     private final RequestSend request;
     private final RequestCompletionHandler callback;
     private final boolean isInitiatedByNetworkClient;
+    private long sendMs;
 
     /**
      * @param createdMs The unix timestamp in milliseconds for the time at which this request was created.
@@ -86,4 +87,11 @@ public final class ClientRequest {
         return isInitiatedByNetworkClient;
     }
 
+    public long getSendMs() {
+        return sendMs;
+    }
+
+    public void setSendMs(long sendMs) {
+        this.sendMs = sendMs;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 7d24c6f..48e4919 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -61,4 +61,10 @@ public class CommonClientConfigs {
 
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
     public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
+
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
+    public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait "
+                                                         + "for the response of a request. If the response is not received before the timeout "
+                                                         + "elapses the client will resend the request if necessary or fail the request if "
+                                                         + "retries are exhausted.";
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 15d00d4..f9956af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -16,6 +16,8 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -33,12 +35,13 @@ final class InFlightRequests {
     /**
      * Add the given request to the queue for the connection it was directed to
      */
-    public void add(ClientRequest request) {
+    public void add(ClientRequest request, long now) {
         Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
         if (reqs == null) {
             reqs = new ArrayDeque<ClientRequest>();
             this.requests.put(request.request().destination(), reqs);
         }
+        request.setSendMs(now);
         reqs.addFirst(request);
     }
 
@@ -123,4 +126,25 @@ final class InFlightRequests {
         }
     }
 
+    /**
+     * Returns a list of nodes with pending inflight request, that need to be timed out
+     *
+     * @param now current time in milliseconds
+     * @param requestTimeout max time to wait for the request to be completed
+     * @return list of nodes
+     */
+    public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
+        List<String> nodeIds = new LinkedList<String>();
+        for (String nodeId : requests.keySet()) {
+            if (inFlightRequestCount(nodeId) > 0) {
+                ClientRequest request = requests.get(nodeId).peekLast();
+                long timeSinceSend = now - request.getSendMs();
+                if (timeSinceSend > requestTimeout) {
+                    nodeIds.add(nodeId);
+                }
+            }
+        }
+
+        return nodeIds;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index f46c0d9..478368e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -68,8 +68,9 @@ public interface KafkaClient extends Closeable {
      * Queue up the given request for sending. Requests can only be sent on ready connections.
      * 
      * @param request The request
+     * @param now The current timestamp
      */
-    public void send(ClientRequest request);
+    public void send(ClientRequest request, long now);
 
     /**
      * Do actual reads and writes from sockets.

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 51a6c5d..8475a76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -73,15 +73,19 @@ public class NetworkClient implements KafkaClient {
     /* the current correlation id to use when sending requests to servers */
     private int correlation;
 
+    /* max time in ms for the producer to wait for acknowledgement from server*/
+    private final int requestTimeoutMs;
+
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
                          int socketSendBuffer,
-                         int socketReceiveBuffer) {
+                         int socketReceiveBuffer,
+                         int requestTimeoutMs) {
         this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer);
+                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs);
     }
 
     public NetworkClient(Selectable selector,
@@ -90,9 +94,10 @@ public class NetworkClient implements KafkaClient {
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
                          int socketSendBuffer,
-                         int socketReceiveBuffer) {
+                         int socketReceiveBuffer,
+                         int requestTimeoutMs) {
         this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer);
+                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -102,7 +107,7 @@ public class NetworkClient implements KafkaClient {
                           int maxInFlightRequestsPerConnection,
                           long reconnectBackoffMs,
                           int socketSendBuffer,
-                          int socketReceiveBuffer) {
+                          int socketReceiveBuffer, int requestTimeoutMs) {
 
         /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
@@ -115,7 +120,6 @@ public class NetworkClient implements KafkaClient {
         } else {
             this.metadataUpdater = metadataUpdater;
         }
-
         this.selector = selector;
         this.clientId = clientId;
         this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
@@ -124,6 +128,7 @@ public class NetworkClient implements KafkaClient {
         this.socketReceiveBuffer = socketReceiveBuffer;
         this.correlation = 0;
         this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
+        this.requestTimeoutMs = requestTimeoutMs;
     }
 
     /**
@@ -222,17 +227,18 @@ public class NetworkClient implements KafkaClient {
      * Queue up the given request for sending. Requests can only be sent out to ready nodes.
      *
      * @param request The request
+     * @param now The current timestamp
      */
     @Override
-    public void send(ClientRequest request) {
+    public void send(ClientRequest request, long now) {
         String nodeId = request.request().destination();
         if (!canSendRequest(nodeId))
             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
-        doSend(request);
+        doSend(request, now);
     }
 
-    private void doSend(ClientRequest request) {
-        this.inFlightRequests.add(request);
+    private void doSend(ClientRequest request, long now) {
+        this.inFlightRequests.add(request, now);
         selector.send(request.request());
     }
 
@@ -258,6 +264,7 @@ public class NetworkClient implements KafkaClient {
         handleCompletedReceives(responses, now);
         handleDisconnections(responses, now);
         handleConnections();
+        handleTimedOutRequests(responses, now);
 
         // invoke callbacks
         for (ClientResponse response : responses) {
@@ -390,6 +397,43 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
+     * Post process disconnection of a node
+     *
+     * @param responses The list of responses to update
+     * @param nodeId Id of the node to be disconnected
+     * @param now The current time
+     */
+    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
+        connectionStates.disconnected(nodeId);
+        for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
+            log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
+            if (!metadataUpdater.maybeHandleDisconnection(request))
+                responses.add(new ClientResponse(request, now, true, null));
+        }
+    }
+
+    /**
+     * Iterate over all the inflight requests and expire any requests that have exceeded the configured the requestTimeout.
+     * The connection to the node associated with the request will be terminated and will be treated as a disconnection.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
+        List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
+        for (String nodeId : nodeIds) {
+            // close connection to the node
+            this.selector.close(nodeId);
+            log.debug("Disconnecting from node {} due to request timeout.", nodeId);
+            processDisconnection(responses, nodeId, now);
+        }
+
+        // we disconnected, so we should probably refresh our metadata
+        if (nodeIds.size() > 0)
+            metadataUpdater.requestUpdate();
+    }
+
+    /**
      * Handle any completed request send. In particular if no response is expected consider the request complete.
      *
      * @param responses The list of responses to update
@@ -433,13 +477,8 @@ public class NetworkClient implements KafkaClient {
      */
     private void handleDisconnections(List<ClientResponse> responses, long now) {
         for (String node : this.selector.disconnected()) {
-            connectionStates.disconnected(node);
             log.debug("Node {} disconnected.", node);
-            for (ClientRequest request : this.inFlightRequests.clearAll(node)) {
-                log.trace("Cancelled request {} due to node {} being disconnected", request, node);
-                if (!metadataUpdater.maybeHandleDisconnection(request))
-                    responses.add(new ClientResponse(request, now, true, null));
-            }
+            processDisconnection(responses, node, now);
         }
         // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
         if (this.selector.disconnected().size() > 0)
@@ -604,7 +643,7 @@ public class NetworkClient implements KafkaClient {
                 this.metadataFetchInProgress = true;
                 ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
-                doSend(metadataRequest);
+                doSend(metadataRequest, now);
             } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                 // we don't have a connection to this node right now, make one
                 log.debug("Initialize connection to node {} for sending metadata request", node.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b9a2d4e..dad0bc0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -159,6 +159,10 @@ public class ConsumerConfig extends AbstractConfig {
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
 
+    /** <code>request.timeout.ms</code> */
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+
 
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
@@ -298,6 +302,12 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
                                 .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
                                 .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                                .define(REQUEST_TIMEOUT_MS_CONFIG,
+                                        Type.INT,
+                                        40 * 1000,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        REQUEST_TIMEOUT_MS_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3ac2be8..540c04a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -527,7 +527,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener {
                     100, // a fixed large enough value will suffice
                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
-                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
+                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
             this.subscriptions = new SubscriptionState(offsetResetStrategy);

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 9517d9d..0b611fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -256,7 +256,7 @@ public class ConsumerNetworkClient implements Closeable {
             while (iterator.hasNext()) {
                 ClientRequest request = iterator.next();
                 if (client.ready(node, now)) {
-                    client.send(request);
+                    client.send(request, now);
                     iterator.remove();
                     requestsSent = true;
                 } else if (client.connectionFailed(node)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 804d569..aaf4670 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -128,7 +128,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private String clientId;
     private final Partitioner partitioner;
     private final int maxRequestSize;
-    private final long metadataFetchTimeoutMs;
     private final long totalMemorySize;
     private final Metadata metadata;
     private final RecordAccumulator accumulator;
@@ -141,6 +140,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
+    private final long maxBlockTimeMs;
+    private final int requestTimeoutMs;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -197,8 +198,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         try {
             log.trace("Starting the Kafka producer");
+            Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
             this.time = new SystemTime();
+
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                             TimeUnit.MILLISECONDS);
@@ -211,11 +214,47 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
-            this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
+            /* check for user defined settings.
+             * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
+             * This should be removed with release 0.9 when the deprecated configs are removed.
+             */
+            if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
+                log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
+                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+                boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
+                if (blockOnBufferFull) {
+                    this.maxBlockTimeMs = Long.MAX_VALUE;
+                } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
+                    log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
+                            "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+                    this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+                } else {
+                    this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+                }
+            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
+                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
+                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+            } else {
+                this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            }
+
+            /* check for user defined settings.
+             * If the TIME_OUT config is set use that for request timeout.
+             * This should be removed with release 0.9
+             */
+            if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
+                log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
+                        ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+                this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
+            } else {
+                this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            }
+
             Map<String, String> metricTags = new LinkedHashMap<String, String>();
             metricTags.put("client-id", clientId);
             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
@@ -223,7 +262,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     this.compressionType,
                     config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                     retryBackoffMs,
-                    config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                     metrics,
                     time,
                     metricTags);
@@ -237,17 +275,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
-                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG));
+                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
+                    this.requestTimeoutMs);
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,
                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                     (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                     config.getInt(ProducerConfig.RETRIES_CONFIG),
-                    config.getInt(ProducerConfig.TIMEOUT_CONFIG),
                     this.metrics,
                     new SystemTime(),
-                    clientId);
+                    clientId,
+                    this.requestTimeoutMs);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
@@ -367,7 +406,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
         try {
             // first make sure the metadata for the topic is available
-            waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
+            long startTime = time.milliseconds();
+            waitOnMetadata(record.topic(), this.maxBlockTimeMs);
             byte[] serializedKey;
             try {
                 serializedKey = keySerializer.serialize(record.topic(), record.key());
@@ -376,6 +416,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in key.serializer");
             }
+            long remainingTime = checkMaybeGetRemainingTime(startTime);
             byte[] serializedValue;
             try {
                 serializedValue = valueSerializer.serialize(record.topic(), record.value());
@@ -384,12 +425,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in value.serializer");
             }
+            remainingTime = checkMaybeGetRemainingTime(startTime);
             int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
+            remainingTime = checkMaybeGetRemainingTime(startTime);
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback);
+            remainingTime = checkMaybeGetRemainingTime(startTime);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -508,7 +552,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
         try {
-            waitOnMetadata(topic, this.metadataFetchTimeoutMs);
+            waitOnMetadata(topic, this.maxBlockTimeMs);
         } catch (InterruptedException e) {
             throw new InterruptException(e);
         }
@@ -628,7 +672,26 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                                                    + "].");
             return partition;
         }
-        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
+        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
+            cluster);
+    }
+
+    /**
+     * Check and may be get the time elapsed since startTime.
+     * Throws a {@link org.apache.kafka.common.errors.TimeoutException} if the  elapsed time
+     * is more than the max time to block (max.block.ms)
+     *
+     * @param startTime timestamp used to check the elapsed time
+     * @return remainingTime
+     */
+    private long checkMaybeGetRemainingTime(long startTime) {
+        long elapsedTime = time.milliseconds() - startTime;
+        if (elapsedTime > maxBlockTimeMs) {
+            throw new TimeoutException("Request timed out");
+        }
+        long remainingTime = maxBlockTimeMs - elapsedTime;
+
+        return remainingTime;
     }
 
     private static class FutureFailure implements Future<RecordMetadata> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 06f00a9..83dad2a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -45,6 +45,10 @@ public class ProducerConfig extends AbstractConfig {
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
     /** <code>metadata.fetch.timeout.ms</code> */
+    /**
+     * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}
+     */
+    @Deprecated
     public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
     private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This "
                                                              + "fetch to succeed before throwing an exception back to the client.";
@@ -93,6 +97,11 @@ public class ProducerConfig extends AbstractConfig {
                                            + " remains alive. This is the strongest available guarantee.";
 
     /** <code>timeout.ms</code> */
+
+    /**
+     * @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG}
+     */
+    @Deprecated
     public static final String TIMEOUT_CONFIG = "timeout.ms";
     private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the "
                                               + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout "
@@ -128,6 +137,10 @@ public class ProducerConfig extends AbstractConfig {
     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
 
     /** <code>block.on.buffer.full</code> */
+    /**
+     * @deprecated This config will be removed in a future release. Also, the {@link #METADATA_FETCH_TIMEOUT_CONFIG} is no longer honored when this property is set to true.
+     */
+    @Deprecated
     public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
     private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to "
                                                            + "immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.";
@@ -177,6 +190,18 @@ public class ProducerConfig extends AbstractConfig {
     public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
     private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>Partitioner</code> interface.";
 
+    /** <code>max.block.ms</code> */
+    public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
+    private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block."
+                                                    + "These methods can be blocked for multiple reasons. For e.g: buffer full, metadata unavailable."
+                                                    + "This configuration imposes maximum limit on the total time spent in fetching metadata, serialization of key and value, partitioning and "
+                                                    + "allocation of buffer memory when doing a send(). In case of partitionsFor(), this configuration imposes a maximum time threshold on waiting "
+                                                    + "for metadata";
+
+    /** <code>request.timeout.ms</code> */
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -200,7 +225,7 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         MAX_REQUEST_SIZE_DOC)
-                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
+                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
@@ -210,6 +235,18 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         METADATA_FETCH_TIMEOUT_DOC)
+                                .define(MAX_BLOCK_MS_CONFIG,
+                                        Type.LONG,
+                                        60 * 1000,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        MAX_BLOCK_MS_DOC)
+                                .define(REQUEST_TIMEOUT_MS_CONFIG,
+                                        Type.INT,
+                                        30 * 1000,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        REQUEST_TIMEOUT_MS_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                         Type.LONG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 4cb1e50..2a45075 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.kafka.clients.producer.BufferExhaustedException;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Rate;
@@ -46,7 +46,6 @@ public final class BufferPool {
 
     private final long totalMemory;
     private final int poolableSize;
-    private final boolean blockOnExhaustion;
     private final ReentrantLock lock;
     private final Deque<ByteBuffer> free;
     private final Deque<Condition> waiters;
@@ -60,17 +59,13 @@ public final class BufferPool {
      * 
      * @param memory The maximum amount of memory that this buffer pool can allocate
      * @param poolableSize The buffer size to cache in the free list rather than deallocating
-     * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the
-     *        {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false
-     *        {@link #allocate(int)} will throw an exception if the buffer is out of memory.
      * @param metrics instance of Metrics
      * @param time time instance
      * @param metricGrpName logical group name for metrics
      * @param metricTags additional key/val attributes for metrics
      */
-    public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map<String, String> metricTags) {
+    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time , String metricGrpName , Map<String, String> metricTags) {
         this.poolableSize = poolableSize;
-        this.blockOnExhaustion = blockOnExhaustion;
         this.lock = new ReentrantLock();
         this.free = new ArrayDeque<ByteBuffer>();
         this.waiters = new ArrayDeque<Condition>();
@@ -91,13 +86,13 @@ public final class BufferPool {
      * is configured with blocking mode.
      * 
      * @param size The buffer size to allocate in bytes
+     * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
      * @return The buffer
      * @throws InterruptedException If the thread is interrupted while blocked
      * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
      *         forever)
-     * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool
      */
-    public ByteBuffer allocate(int size) throws InterruptedException {
+    public ByteBuffer allocate(int size, long maxTimeToBlock) throws InterruptedException {
         if (size > this.totalMemory)
             throw new IllegalArgumentException("Attempt to allocate " + size
                                                + " bytes, but there is a hard limit of "
@@ -120,10 +115,6 @@ public final class BufferPool {
                 this.availableMemory -= size;
                 lock.unlock();
                 return ByteBuffer.allocate(size);
-            } else if (!blockOnExhaustion) {
-                throw new BufferExhaustedException("You have exhausted the " + this.totalMemory
-                                                   + " bytes of memory you configured for the client and the client is configured to error"
-                                                   + " rather than block when memory is exhausted.");
             } else {
                 // we are out of memory and will have to block
                 int accumulated = 0;
@@ -134,7 +125,8 @@ public final class BufferPool {
                 // enough memory to allocate one
                 while (accumulated < size) {
                     long startWait = time.nanoseconds();
-                    moreMemory.await();
+                    if (!moreMemory.await(maxTimeToBlock, TimeUnit.MILLISECONDS))
+                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time");
                     long endWait = time.nanoseconds();
                     this.waitTime.record(endWait - startWait, time.milliseconds());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index a152bd7..eed2a5e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.util.Iterator;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
@@ -82,8 +83,6 @@ public final class RecordAccumulator {
      *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
      * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
      *        exhausting all retries in a short period of time.
-     * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of
-     *        memory
      * @param metrics The metrics
      * @param time The time instance to use
      * @param metricTags additional key/value attributes of the metric
@@ -93,7 +92,6 @@ public final class RecordAccumulator {
                              CompressionType compression,
                              long lingerMs,
                              long retryBackoffMs,
-                             boolean blockOnBufferFull,
                              Metrics metrics,
                              Time time,
                              Map<String, String> metricTags) {
@@ -107,7 +105,7 @@ public final class RecordAccumulator {
         this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
         String metricGrpName = "producer-metrics";
-        this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags);
+        this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags);
         this.incomplete = new IncompleteRecordBatches();
         this.time = time;
         registerMetrics(metrics, metricGrpName, metricTags);
@@ -153,8 +151,9 @@ public final class RecordAccumulator {
      * @param key The key for the record
      * @param value The value for the record
      * @param callback The user-supplied callback to execute when the request is complete
+     * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
      */
-    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException {
+    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
         // We keep track of the number of appending thread to make sure we do not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
@@ -166,7 +165,7 @@ public final class RecordAccumulator {
             synchronized (dq) {
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                     if (future != null)
                         return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                 }
@@ -175,14 +174,14 @@ public final class RecordAccumulator {
             // we don't have an in-progress record batch try to allocate a new batch
             int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
             log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
-            ByteBuffer buffer = free.allocate(size);
+            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
             synchronized (dq) {
                 // Need to check if producer is closed again after grabbing the dequeue lock.
                 if (closed)
                     throw new IllegalStateException("Cannot send after the producer is closed.");
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                     if (future != null) {
                         // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                         free.deallocate(buffer);
@@ -191,7 +190,7 @@ public final class RecordAccumulator {
                 }
                 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
-                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
+                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));
 
                 dq.addLast(batch);
                 incomplete.add(batch);
@@ -203,11 +202,51 @@ public final class RecordAccumulator {
     }
 
     /**
+     * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
+     * due to metadata being unavailable
+     */
+    public List<RecordBatch> abortExpiredBatches(int requestTimeout, Cluster cluster, long now) {
+        List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
+        int count = 0;
+        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
+            TopicPartition topicAndPartition = entry.getKey();
+            Deque<RecordBatch> dq = entry.getValue();
+            synchronized (dq) {
+                // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut
+                Iterator<RecordBatch> batchIterator = dq.iterator();
+                while (batchIterator.hasNext()) {
+                    RecordBatch batch = batchIterator.next();
+                    Node leader = cluster.leaderFor(topicAndPartition);
+                    if (leader == null) {
+                        // check if the batch is expired
+                        if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
+                            expiredBatches.add(batch);
+                            count++;
+                            batchIterator.remove();
+                            deallocate(batch);
+                        } else {
+                            if (!batch.inRetry()) {
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        if (expiredBatches.size() > 0)
+            log.trace("Expired {} batches in accumulator", count);
+
+        return expiredBatches;
+    }
+
+    /**
      * Re-enqueue the given record batch in the accumulator to retry
      */
     public void reenqueue(RecordBatch batch, long now) {
         batch.attempts++;
         batch.lastAttemptMs = now;
+        batch.lastAppendTime = now;
+        batch.setRetry();
         Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
         synchronized (deque) {
             deque.addFirst(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 06182db..3f18582 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -18,6 +18,7 @@ import java.util.List;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.slf4j.Logger;
@@ -41,7 +42,9 @@ public final class RecordBatch {
     public final MemoryRecords records;
     public final TopicPartition topicPartition;
     public final ProduceRequestResult produceFuture;
+    public long lastAppendTime;
     private final List<Thunk> thunks;
+    private boolean retry;
 
     public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
         this.createdMs = now;
@@ -50,6 +53,8 @@ public final class RecordBatch {
         this.topicPartition = tp;
         this.produceFuture = new ProduceRequestResult();
         this.thunks = new ArrayList<Thunk>();
+        this.lastAppendTime = createdMs;
+        this.retry = false;
     }
 
     /**
@@ -57,12 +62,13 @@ public final class RecordBatch {
      * 
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
-    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
+    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) {
         if (!this.records.hasRoomFor(key, value)) {
             return null;
         } else {
             this.records.append(0L, key, value);
             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
+            this.lastAppendTime = now;
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
             if (callback != null)
                 thunks.add(new Thunk(callback, future));
@@ -116,4 +122,34 @@ public final class RecordBatch {
     public String toString() {
         return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
     }
+
+    /**
+     * Expire the batch that is ready but is sitting in accumulator for more than requestTimeout due to metadata being unavailable.
+     * We need to explicitly check if the record is full or linger time is met because the accumulator's partition may not be ready
+     * if the leader is unavailable.
+     */
+    public boolean maybeExpire(int requestTimeout, long now, long lingerMs) {
+        boolean expire = false;
+        if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + lingerMs))) {
+            expire = true;
+            this.records.close();
+            this.done(-1L, new TimeoutException("Batch Expired"));
+        }
+
+        return expire;
+    }
+
+    /**
+     * Returns if the batch is been retried for sending to kafka
+     */
+    public boolean inRetry() {
+        return this.retry;
+    }
+
+    /**
+     * Set retry to true if the batch is being retried (for send)
+     */
+    public void setRetry() {
+        this.retry = true;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index d2e64f7..134d45a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -71,9 +71,6 @@ public class Sender implements Runnable {
     /* the number of acknowledgements to request from the server */
     private final short acks;
 
-    /* the max time in ms for the server to wait for acknowlegements */
-    private final int requestTimeout;
-
     /* the number of times to retry a failed request before giving up */
     private final int retries;
 
@@ -92,27 +89,30 @@ public class Sender implements Runnable {
     /* param clientId of the client */
     private String clientId;
 
+    /* the max time to wait for the server to respond to the request*/
+    private final int requestTimeout;
+
     public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
                   int maxRequestSize,
                   short acks,
                   int retries,
-                  int requestTimeout,
                   Metrics metrics,
                   Time time,
-                  String clientId) {
+                  String clientId,
+                  int requestTimeout) {
         this.client = client;
         this.accumulator = accumulator;
         this.metadata = metadata;
         this.maxRequestSize = maxRequestSize;
         this.running = true;
-        this.requestTimeout = requestTimeout;
         this.acks = acks;
         this.retries = retries;
         this.time = time;
         this.clientId = clientId;
         this.sensors = new SenderMetrics(metrics);
+        this.requestTimeout = requestTimeout;
     }
 
     /**
@@ -187,6 +187,12 @@ public class Sender implements Runnable {
                                                                          result.readyNodes,
                                                                          this.maxRequestSize,
                                                                          now);
+
+        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
+        // update sensors
+        for (RecordBatch expiredBatch : expiredBatches)
+            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
+
         sensors.updateProduceRequestMetrics(batches);
         List<ClientRequest> requests = createProduceRequests(batches, now);
         // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
@@ -200,7 +206,7 @@ public class Sender implements Runnable {
             pollTimeout = 0;
         }
         for (ClientRequest request : requests)
-            client.send(request);
+            client.send(request, now);
 
         // if some partitions are already ready to be sent, the select time would be 0;
         // otherwise if some partition already has some data accumulated but not ready yet,

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 70e74bd..629fa0d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -38,11 +38,6 @@ public interface Selectable {
     public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
 
     /**
-     * Begin disconnecting the connection identified by the given id
-     */
-    public void disconnect(String id);
-
-    /**
      * Wakeup this selector if it is blocked on I/O
      */
     public void wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 5a4909e..9e52078 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -177,17 +177,6 @@ public class Selector implements Selectable {
     }
 
     /**
-     * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
-     * processed until the next {@link #poll(long) poll()} call.
-     */
-    @Override
-    public void disconnect(String id) {
-        KafkaChannel channel = this.channels.get(id);
-        if (channel != null)
-            channel.disconnect();
-    }
-
-    /**
      * Interrupt the nioSelector if it is blocked waiting to do I/O.
      */
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index e5815f5..ee72328 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -100,7 +100,7 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public void send(ClientRequest request) {
+    public void send(ClientRequest request, long now) {
         if (!futureResponses.isEmpty()) {
             FutureResponse futureResp = futureResponses.poll();
             if (!futureResp.requestMatcher.matches(request))
@@ -109,6 +109,7 @@ public class MockClient implements KafkaClient {
             ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
             responses.add(resp);
         } else {
+            request.setSendMs(now);
             this.requests.add(request);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 69c93c3..2379896 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -45,42 +45,30 @@ import org.junit.Test;
 
 public class NetworkClientTest {
 
+    private final int requestTimeoutMs = 1000;
     private MockTime time = new MockTime();
     private MockSelector selector = new MockSelector(time);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private int nodeId = 1;
     private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
     private Node node = cluster.nodes().get(0);
-    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024);
+    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs);
+
     private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-            "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024);
+            "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs);
 
     @Before
     public void setup() {
         metadata.update(cluster, time.milliseconds());
     }
 
-    @Test
-    public void testReadyAndDisconnect() {
-        assertFalse("Client begins unready as it has no connection.", client.ready(node, time.milliseconds()));
-        assertEquals("The connection is established as a side-effect of the readiness check", 1, selector.connected().size());
-        client.poll(1, time.milliseconds());
-        selector.clear();
-        assertTrue("Now the client is ready", client.ready(node, time.milliseconds()));
-        selector.disconnect(node.idString());
-        client.poll(1, time.milliseconds());
-        selector.clear();
-        assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
-        assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0);
-    }
-
     @Test(expected = IllegalStateException.class)
     public void testSendToUnreadyNode() {
         RequestSend send = new RequestSend("5",
                                            client.nextRequestHeader(ApiKeys.METADATA),
                                            new MetadataRequest(Arrays.asList("test")).toStruct());
         ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
-        client.send(request);
+        client.send(request, time.milliseconds());
         client.poll(1, time.milliseconds());
     }
 
@@ -105,7 +93,7 @@ public class NetworkClientTest {
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
         RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
         ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
-        client.send(request);
+        client.send(request, time.milliseconds());
         assertEquals("There should be 1 in-flight request after send", 1, client.inFlightRequestCount(node.idString()));
 
         client.close(node.idString());
@@ -120,7 +108,7 @@ public class NetworkClientTest {
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
         awaitReady(networkClient, node);
-        networkClient.send(request);
+        networkClient.send(request, time.milliseconds());
         networkClient.poll(1, time.milliseconds());
         assertEquals(1, networkClient.inFlightRequestCount());
         ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId());
@@ -143,15 +131,31 @@ public class NetworkClientTest {
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
     }
-    
+
+    @Test
+    public void testRequestTimeout() {
+        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
+        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
+        RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
+        TestCallbackHandler handler = new TestCallbackHandler();
+        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
+        awaitReady(client, node);
+        long now = time.milliseconds();
+        client.send(request, now);
+        // sleeping to make sure that the time since last send is greater than requestTimeOut
+        time.sleep(3000);
+        client.poll(3000, time.milliseconds());
+        String disconnectedNode = selector.disconnected().get(0);
+        assertEquals(node.idString(), disconnectedNode);
+    }
+
     private static class TestCallbackHandler implements RequestCompletionHandler {
         public boolean executed = false;
         public ClientResponse response;
-        
+
         public void onComplete(ClientResponse response) {
             this.executed = true;
             this.response = response;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da39931a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 2c69382..ded5d3e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -35,6 +35,7 @@ import static org.junit.Assert.*;
 public class BufferPoolTest {
     private MockTime time = new MockTime();
     private Metrics metrics = new Metrics(time);
+    private final long maxBlockTimeMs =  2000;
     String metricGroup = "TestMetrics";
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
 
@@ -45,8 +46,8 @@ public class BufferPoolTest {
     public void testSimple() throws Exception {
         long totalMemory = 64 * 1024;
         int size = 1024;
-        BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(size);
+        BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs);
         assertEquals("Buffer size should equal requested size.", size, buffer.limit());
         assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
         assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
@@ -55,13 +56,13 @@ public class BufferPoolTest {
         pool.deallocate(buffer);
         assertEquals("All memory should be available", totalMemory, pool.availableMemory());
         assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
-        buffer = pool.allocate(size);
+        buffer = pool.allocate(size, maxBlockTimeMs);
         assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
         assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
         pool.deallocate(buffer);
         assertEquals("All memory should be available", totalMemory, pool.availableMemory());
         assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
-        buffer = pool.allocate(2 * size);
+        buffer = pool.allocate(2 * size, maxBlockTimeMs);
         pool.deallocate(buffer);
         assertEquals("All memory should be available", totalMemory, pool.availableMemory());
         assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
@@ -72,23 +73,11 @@ public class BufferPoolTest {
      */
     @Test(expected = IllegalArgumentException.class)
     public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
-        BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(1024);
+        BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
         assertEquals(1024, buffer.limit());
         pool.deallocate(buffer);
-        buffer = pool.allocate(1025);
-    }
-
-    @Test
-    public void testNonblockingMode() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags);
-        pool.allocate(1);
-        try {
-            pool.allocate(2);
-            fail("The buffer allocated more than it has!");
-        } catch (BufferExhaustedException e) {
-            // this is good
-        }
+        buffer = pool.allocate(1025, maxBlockTimeMs);
     }
 
     /**
@@ -96,8 +85,8 @@ public class BufferPoolTest {
      */
     @Test
     public void testDelayedAllocation() throws Exception {
-        BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(1024);
+        BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
         CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
         CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
         assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
@@ -126,7 +115,7 @@ public class BufferPoolTest {
         Thread thread = new Thread() {
             public void run() {
                 try {
-                    pool.allocate(size);
+                    pool.allocate(size, maxBlockTimeMs);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 } finally {
@@ -139,6 +128,23 @@ public class BufferPoolTest {
     }
 
     /**
+     * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBlockTimeout() throws Exception {
+        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup, metricTags);
+        pool.allocate(1, maxBlockTimeMs);
+        try {
+            pool.allocate(2, maxBlockTimeMs);
+            fail("The buffer allocated more memory than its maximum value 2");
+        } catch (TimeoutException e) {
+            // this is good
+        }
+    }
+
+    /**
      * This test creates lots of threads that hammer on the pool
      */
     @Test
@@ -147,7 +153,7 @@ public class BufferPoolTest {
         final int iterations = 50000;
         final int poolableSize = 1024;
         final long totalMemory = numThreads / 2 * poolableSize;
-        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags);
+        final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup, metricTags);
         List<StressTestThread> threads = new ArrayList<StressTestThread>();
         for (int i = 0; i < numThreads; i++)
             threads.add(new StressTestThread(pool, iterations));
@@ -163,6 +169,7 @@ public class BufferPoolTest {
     public static class StressTestThread extends Thread {
         private final int iterations;
         private final BufferPool pool;
+        private final long maxBlockTimeMs =  2000;
         public final AtomicBoolean success = new AtomicBoolean(false);
 
         public StressTestThread(BufferPool pool, int iterations) {
@@ -180,7 +187,7 @@ public class BufferPoolTest {
                     else
                         // allocate a random size
                         size = TestUtils.RANDOM.nextInt((int) pool.totalMemory());
-                    ByteBuffer buffer = pool.allocate(size);
+                    ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs);
                     pool.deallocate(buffer);
                 }
                 success.set(true);


Mime
View raw message