kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [2/2] kafka git commit: Revert "KAFKA-2120; Add a request timeout to NetworkClient (KIP-19); reviewed by Jason Gustafson, Ismael Juma, Joel Koshy, Jun Rao, and Edward Ribeiro"
Date Thu, 17 Sep 2015 21:36:36 GMT
Revert "KAFKA-2120; Add a request timeout to NetworkClient (KIP-19); reviewed by Jason Gustafson, Ismael Juma, Joel Koshy, Jun Rao, and Edward Ribeiro"

This reverts commit da39931afad8008bc2b385a75a462777be051435.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9dbeb71a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9dbeb71a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9dbeb71a

Branch: refs/heads/trunk
Commit: 9dbeb71ab258955e04b46991c1baf880b07633f4
Parents: da39931
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Thu Sep 17 14:36:01 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Sep 17 14:36:01 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientRequest.java |  8 --
 .../kafka/clients/CommonClientConfigs.java      |  6 --
 .../apache/kafka/clients/InFlightRequests.java  | 26 +-----
 .../org/apache/kafka/clients/KafkaClient.java   |  3 +-
 .../org/apache/kafka/clients/NetworkClient.java | 73 ++++-------------
 .../kafka/clients/consumer/ConsumerConfig.java  | 10 ---
 .../kafka/clients/consumer/KafkaConsumer.java   |  3 +-
 .../internals/ConsumerNetworkClient.java        |  2 +-
 .../kafka/clients/producer/KafkaProducer.java   | 83 +++-----------------
 .../kafka/clients/producer/ProducerConfig.java  | 39 +--------
 .../clients/producer/internals/BufferPool.java  | 20 +++--
 .../producer/internals/RecordAccumulator.java   | 57 +++-----------
 .../clients/producer/internals/RecordBatch.java | 38 +--------
 .../clients/producer/internals/Sender.java      | 20 ++---
 .../apache/kafka/common/network/Selectable.java |  5 ++
 .../apache/kafka/common/network/Selector.java   | 11 +++
 .../org/apache/kafka/clients/MockClient.java    |  3 +-
 .../apache/kafka/clients/NetworkClientTest.java | 48 ++++++-----
 .../producer/internals/BufferPoolTest.java      | 57 ++++++--------
 .../internals/RecordAccumulatorTest.java        | 67 ++++++----------
 .../clients/producer/internals/SenderTest.java  | 21 +++--
 .../kafka/common/network/SSLSelectorTest.java   | 19 +++++
 .../kafka/common/network/SelectorTest.java      | 17 ++++
 .../org/apache/kafka/test/MockSelector.java     |  6 +-
 .../controller/ControllerChannelManager.scala   |  3 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  5 --
 .../main/scala/kafka/server/KafkaServer.scala   |  3 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  3 +-
 .../scala/kafka/tools/ProducerPerformance.scala |  2 +
 .../kafka/utils/NetworkClientBlockingOps.scala  |  2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala | 46 ++++++++++-
 .../unit/kafka/server/KafkaConfigTest.scala     |  1 -
 .../test/scala/unit/kafka/utils/TestUtils.scala |  1 +
 33 files changed, 251 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 6410f09..dc8f0f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -24,7 +24,6 @@ public final class ClientRequest {
     private final RequestSend request;
     private final RequestCompletionHandler callback;
     private final boolean isInitiatedByNetworkClient;
-    private long sendMs;
 
     /**
      * @param createdMs The unix timestamp in milliseconds for the time at which this request was created.
@@ -87,11 +86,4 @@ public final class ClientRequest {
         return isInitiatedByNetworkClient;
     }
 
-    public long getSendMs() {
-        return sendMs;
-    }
-
-    public void setSendMs(long sendMs) {
-        this.sendMs = sendMs;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 48e4919..7d24c6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -61,10 +61,4 @@ public class CommonClientConfigs {
 
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
     public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
-
-    public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
-    public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait "
-                                                         + "for the response of a request. If the response is not received before the timeout "
-                                                         + "elapses the client will resend the request if necessary or fail the request if "
-                                                         + "retries are exhausted.";
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index f9956af..15d00d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -16,8 +16,6 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -35,13 +33,12 @@ final class InFlightRequests {
     /**
      * Add the given request to the queue for the connection it was directed to
      */
-    public void add(ClientRequest request, long now) {
+    public void add(ClientRequest request) {
         Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
         if (reqs == null) {
             reqs = new ArrayDeque<ClientRequest>();
             this.requests.put(request.request().destination(), reqs);
         }
-        request.setSendMs(now);
         reqs.addFirst(request);
     }
 
@@ -126,25 +123,4 @@ final class InFlightRequests {
         }
     }
 
-    /**
-     * Returns a list of nodes with pending inflight request, that need to be timed out
-     *
-     * @param now current time in milliseconds
-     * @param requestTimeout max time to wait for the request to be completed
-     * @return list of nodes
-     */
-    public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
-        List<String> nodeIds = new LinkedList<String>();
-        for (String nodeId : requests.keySet()) {
-            if (inFlightRequestCount(nodeId) > 0) {
-                ClientRequest request = requests.get(nodeId).peekLast();
-                long timeSinceSend = now - request.getSendMs();
-                if (timeSinceSend > requestTimeout) {
-                    nodeIds.add(nodeId);
-                }
-            }
-        }
-
-        return nodeIds;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 478368e..f46c0d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -68,9 +68,8 @@ public interface KafkaClient extends Closeable {
      * Queue up the given request for sending. Requests can only be sent on ready connections.
      * 
      * @param request The request
-     * @param now The current timestamp
      */
-    public void send(ClientRequest request, long now);
+    public void send(ClientRequest request);
 
     /**
      * Do actual reads and writes from sockets.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8475a76..51a6c5d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -73,19 +73,15 @@ public class NetworkClient implements KafkaClient {
     /* the current correlation id to use when sending requests to servers */
     private int correlation;
 
-    /* max time in ms for the producer to wait for acknowledgement from server*/
-    private final int requestTimeoutMs;
-
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
                          int socketSendBuffer,
-                         int socketReceiveBuffer,
-                         int requestTimeoutMs) {
+                         int socketReceiveBuffer) {
         this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs);
+                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer);
     }
 
     public NetworkClient(Selectable selector,
@@ -94,10 +90,9 @@ public class NetworkClient implements KafkaClient {
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
                          int socketSendBuffer,
-                         int socketReceiveBuffer,
-                         int requestTimeoutMs) {
+                         int socketReceiveBuffer) {
         this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs);
+                socketSendBuffer, socketReceiveBuffer);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -107,7 +102,7 @@ public class NetworkClient implements KafkaClient {
                           int maxInFlightRequestsPerConnection,
                           long reconnectBackoffMs,
                           int socketSendBuffer,
-                          int socketReceiveBuffer, int requestTimeoutMs) {
+                          int socketReceiveBuffer) {
 
         /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
@@ -120,6 +115,7 @@ public class NetworkClient implements KafkaClient {
         } else {
             this.metadataUpdater = metadataUpdater;
         }
+
         this.selector = selector;
         this.clientId = clientId;
         this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
@@ -128,7 +124,6 @@ public class NetworkClient implements KafkaClient {
         this.socketReceiveBuffer = socketReceiveBuffer;
         this.correlation = 0;
         this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
-        this.requestTimeoutMs = requestTimeoutMs;
     }
 
     /**
@@ -227,18 +222,17 @@ public class NetworkClient implements KafkaClient {
      * Queue up the given request for sending. Requests can only be sent out to ready nodes.
      *
      * @param request The request
-     * @param now The current timestamp
      */
     @Override
-    public void send(ClientRequest request, long now) {
+    public void send(ClientRequest request) {
         String nodeId = request.request().destination();
         if (!canSendRequest(nodeId))
             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
-        doSend(request, now);
+        doSend(request);
     }
 
-    private void doSend(ClientRequest request, long now) {
-        this.inFlightRequests.add(request, now);
+    private void doSend(ClientRequest request) {
+        this.inFlightRequests.add(request);
         selector.send(request.request());
     }
 
@@ -264,7 +258,6 @@ public class NetworkClient implements KafkaClient {
         handleCompletedReceives(responses, now);
         handleDisconnections(responses, now);
         handleConnections();
-        handleTimedOutRequests(responses, now);
 
         // invoke callbacks
         for (ClientResponse response : responses) {
@@ -397,43 +390,6 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Post process disconnection of a node
-     *
-     * @param responses The list of responses to update
-     * @param nodeId Id of the node to be disconnected
-     * @param now The current time
-     */
-    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
-        connectionStates.disconnected(nodeId);
-        for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
-            log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
-            if (!metadataUpdater.maybeHandleDisconnection(request))
-                responses.add(new ClientResponse(request, now, true, null));
-        }
-    }
-
-    /**
-     * Iterate over all the inflight requests and expire any requests that have exceeded the configured the requestTimeout.
-     * The connection to the node associated with the request will be terminated and will be treated as a disconnection.
-     *
-     * @param responses The list of responses to update
-     * @param now The current time
-     */
-    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
-        List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
-        for (String nodeId : nodeIds) {
-            // close connection to the node
-            this.selector.close(nodeId);
-            log.debug("Disconnecting from node {} due to request timeout.", nodeId);
-            processDisconnection(responses, nodeId, now);
-        }
-
-        // we disconnected, so we should probably refresh our metadata
-        if (nodeIds.size() > 0)
-            metadataUpdater.requestUpdate();
-    }
-
-    /**
      * Handle any completed request send. In particular if no response is expected consider the request complete.
      *
      * @param responses The list of responses to update
@@ -477,8 +433,13 @@ public class NetworkClient implements KafkaClient {
      */
     private void handleDisconnections(List<ClientResponse> responses, long now) {
         for (String node : this.selector.disconnected()) {
+            connectionStates.disconnected(node);
             log.debug("Node {} disconnected.", node);
-            processDisconnection(responses, node, now);
+            for (ClientRequest request : this.inFlightRequests.clearAll(node)) {
+                log.trace("Cancelled request {} due to node {} being disconnected", request, node);
+                if (!metadataUpdater.maybeHandleDisconnection(request))
+                    responses.add(new ClientResponse(request, now, true, null));
+            }
         }
         // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
         if (this.selector.disconnected().size() > 0)
@@ -643,7 +604,7 @@ public class NetworkClient implements KafkaClient {
                 this.metadataFetchInProgress = true;
                 ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
-                doSend(metadataRequest, now);
+                doSend(metadataRequest);
             } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                 // we don't have a connection to this node right now, make one
                 log.debug("Initialize connection to node {} for sending metadata request", node.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index dad0bc0..b9a2d4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -159,10 +159,6 @@ public class ConsumerConfig extends AbstractConfig {
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
 
-    /** <code>request.timeout.ms</code> */
-    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
-    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
-
 
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
@@ -302,12 +298,6 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
                                 .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
                                 .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
-                                .define(REQUEST_TIMEOUT_MS_CONFIG,
-                                        Type.INT,
-                                        40 * 1000,
-                                        atLeast(0),
-                                        Importance.MEDIUM,
-                                        REQUEST_TIMEOUT_MS_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 540c04a..3ac2be8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -527,8 +527,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener {
                     100, // a fixed large enough value will suffice
                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
-                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
-                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
+                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
             this.subscriptions = new SubscriptionState(offsetResetStrategy);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 0b611fb..9517d9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -256,7 +256,7 @@ public class ConsumerNetworkClient implements Closeable {
             while (iterator.hasNext()) {
                 ClientRequest request = iterator.next();
                 if (client.ready(node, now)) {
-                    client.send(request, now);
+                    client.send(request);
                     iterator.remove();
                     requestsSent = true;
                 } else if (client.connectionFailed(node)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index aaf4670..804d569 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -128,6 +128,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private String clientId;
     private final Partitioner partitioner;
     private final int maxRequestSize;
+    private final long metadataFetchTimeoutMs;
     private final long totalMemorySize;
     private final Metadata metadata;
     private final RecordAccumulator accumulator;
@@ -140,8 +141,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
-    private final long maxBlockTimeMs;
-    private final int requestTimeoutMs;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -198,10 +197,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         try {
             log.trace("Starting the Kafka producer");
-            Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
             this.time = new SystemTime();
-
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                             TimeUnit.MILLISECONDS);
@@ -214,47 +211,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
+            this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
-            /* check for user defined settings.
-             * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
-             * This should be removed with release 0.9 when the deprecated configs are removed.
-             */
-            if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
-                log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
-                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
-                if (blockOnBufferFull) {
-                    this.maxBlockTimeMs = Long.MAX_VALUE;
-                } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
-                    log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
-                            "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                    this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-                } else {
-                    this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                }
-            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
-                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
-                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-            } else {
-                this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            }
-
-            /* check for user defined settings.
-             * If the TIME_OUT config is set use that for request timeout.
-             * This should be removed with release 0.9
-             */
-            if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
-                log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
-                        ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-                this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
-            } else {
-                this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            }
-
             Map<String, String> metricTags = new LinkedHashMap<String, String>();
             metricTags.put("client-id", clientId);
             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
@@ -262,6 +223,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     this.compressionType,
                     config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                     retryBackoffMs,
+                    config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                     metrics,
                     time,
                     metricTags);
@@ -275,18 +237,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
-                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
-                    this.requestTimeoutMs);
+                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG));
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,
                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                     (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                     config.getInt(ProducerConfig.RETRIES_CONFIG),
+                    config.getInt(ProducerConfig.TIMEOUT_CONFIG),
                     this.metrics,
                     new SystemTime(),
-                    clientId,
-                    this.requestTimeoutMs);
+                    clientId);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
@@ -406,8 +367,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
         try {
             // first make sure the metadata for the topic is available
-            long startTime = time.milliseconds();
-            waitOnMetadata(record.topic(), this.maxBlockTimeMs);
+            waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
             byte[] serializedKey;
             try {
                 serializedKey = keySerializer.serialize(record.topic(), record.key());
@@ -416,7 +376,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in key.serializer");
             }
-            long remainingTime = checkMaybeGetRemainingTime(startTime);
             byte[] serializedValue;
             try {
                 serializedValue = valueSerializer.serialize(record.topic(), record.value());
@@ -425,15 +384,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in value.serializer");
             }
-            remainingTime = checkMaybeGetRemainingTime(startTime);
             int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
-            remainingTime = checkMaybeGetRemainingTime(startTime);
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            remainingTime = checkMaybeGetRemainingTime(startTime);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -552,7 +508,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
         try {
-            waitOnMetadata(topic, this.maxBlockTimeMs);
+            waitOnMetadata(topic, this.metadataFetchTimeoutMs);
         } catch (InterruptedException e) {
             throw new InterruptException(e);
         }
@@ -672,26 +628,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                                                    + "].");
             return partition;
         }
-        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
-            cluster);
-    }
-
-    /**
-     * Check and may be get the time elapsed since startTime.
-     * Throws a {@link org.apache.kafka.common.errors.TimeoutException} if the  elapsed time
-     * is more than the max time to block (max.block.ms)
-     *
-     * @param startTime timestamp used to check the elapsed time
-     * @return remainingTime
-     */
-    private long checkMaybeGetRemainingTime(long startTime) {
-        long elapsedTime = time.milliseconds() - startTime;
-        if (elapsedTime > maxBlockTimeMs) {
-            throw new TimeoutException("Request timed out");
-        }
-        long remainingTime = maxBlockTimeMs - elapsedTime;
-
-        return remainingTime;
+        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
     }
 
     private static class FutureFailure implements Future<RecordMetadata> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 83dad2a..06f00a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -45,10 +45,6 @@ public class ProducerConfig extends AbstractConfig {
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
     /** <code>metadata.fetch.timeout.ms</code> */
-    /**
-     * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}
-     */
-    @Deprecated
     public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
     private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This "
                                                              + "fetch to succeed before throwing an exception back to the client.";
@@ -97,11 +93,6 @@ public class ProducerConfig extends AbstractConfig {
                                            + " remains alive. This is the strongest available guarantee.";
 
     /** <code>timeout.ms</code> */
-
-    /**
-     * @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG}
-     */
-    @Deprecated
     public static final String TIMEOUT_CONFIG = "timeout.ms";
     private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the "
                                               + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout "
@@ -137,10 +128,6 @@ public class ProducerConfig extends AbstractConfig {
     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
 
     /** <code>block.on.buffer.full</code> */
-    /**
-     * @deprecated This config will be removed in a future release. Also, the {@link #METADATA_FETCH_TIMEOUT_CONFIG} is no longer honored when this property is set to true.
-     */
-    @Deprecated
     public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
     private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to "
                                                            + "immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.";
@@ -190,18 +177,6 @@ public class ProducerConfig extends AbstractConfig {
     public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
     private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>Partitioner</code> interface.";
 
-    /** <code>max.block.ms</code> */
-    public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
-    private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block."
-                                                    + "These methods can be blocked for multiple reasons. For e.g: buffer full, metadata unavailable."
-                                                    + "This configuration imposes maximum limit on the total time spent in fetching metadata, serialization of key and value, partitioning and "
-                                                    + "allocation of buffer memory when doing a send(). In case of partitionsFor(), this configuration imposes a maximum time threshold on waiting "
-                                                    + "for metadata";
-
-    /** <code>request.timeout.ms</code> */
-    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
-    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
-
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -225,7 +200,7 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         MAX_REQUEST_SIZE_DOC)
-                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
+                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
@@ -235,18 +210,6 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         METADATA_FETCH_TIMEOUT_DOC)
-                                .define(MAX_BLOCK_MS_CONFIG,
-                                        Type.LONG,
-                                        60 * 1000,
-                                        atLeast(0),
-                                        Importance.MEDIUM,
-                                        MAX_BLOCK_MS_DOC)
-                                .define(REQUEST_TIMEOUT_MS_CONFIG,
-                                        Type.INT,
-                                        30 * 1000,
-                                        atLeast(0),
-                                        Importance.MEDIUM,
-                                        REQUEST_TIMEOUT_MS_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                         Type.LONG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 2a45075..4cb1e50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.kafka.clients.producer.BufferExhaustedException;
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Rate;
@@ -46,6 +46,7 @@ public final class BufferPool {
 
     private final long totalMemory;
     private final int poolableSize;
+    private final boolean blockOnExhaustion;
     private final ReentrantLock lock;
     private final Deque<ByteBuffer> free;
     private final Deque<Condition> waiters;
@@ -59,13 +60,17 @@ public final class BufferPool {
      * 
      * @param memory The maximum amount of memory that this buffer pool can allocate
      * @param poolableSize The buffer size to cache in the free list rather than deallocating
+     * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the
+     *        {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false
+     *        {@link #allocate(int)} will throw an exception if the buffer is out of memory.
      * @param metrics instance of Metrics
      * @param time time instance
      * @param metricGrpName logical group name for metrics
      * @param metricTags additional key/val attributes for metrics
      */
-    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time , String metricGrpName , Map<String, String> metricTags) {
+    public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map<String, String> metricTags) {
         this.poolableSize = poolableSize;
+        this.blockOnExhaustion = blockOnExhaustion;
         this.lock = new ReentrantLock();
         this.free = new ArrayDeque<ByteBuffer>();
         this.waiters = new ArrayDeque<Condition>();
@@ -86,13 +91,13 @@ public final class BufferPool {
      * is configured with blocking mode.
      * 
      * @param size The buffer size to allocate in bytes
-     * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
      * @return The buffer
      * @throws InterruptedException If the thread is interrupted while blocked
      * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
      *         forever)
+     * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool
      */
-    public ByteBuffer allocate(int size, long maxTimeToBlock) throws InterruptedException {
+    public ByteBuffer allocate(int size) throws InterruptedException {
         if (size > this.totalMemory)
             throw new IllegalArgumentException("Attempt to allocate " + size
                                                + " bytes, but there is a hard limit of "
@@ -115,6 +120,10 @@ public final class BufferPool {
                 this.availableMemory -= size;
                 lock.unlock();
                 return ByteBuffer.allocate(size);
+            } else if (!blockOnExhaustion) {
+                throw new BufferExhaustedException("You have exhausted the " + this.totalMemory
+                                                   + " bytes of memory you configured for the client and the client is configured to error"
+                                                   + " rather than block when memory is exhausted.");
             } else {
                 // we are out of memory and will have to block
                 int accumulated = 0;
@@ -125,8 +134,7 @@ public final class BufferPool {
                 // enough memory to allocate one
                 while (accumulated < size) {
                     long startWait = time.nanoseconds();
-                    if (!moreMemory.await(maxTimeToBlock, TimeUnit.MILLISECONDS))
-                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time");
+                    moreMemory.await();
                     long endWait = time.nanoseconds();
                     this.waitTime.record(endWait - startWait, time.milliseconds());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index eed2a5e..a152bd7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -12,7 +12,6 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.util.Iterator;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
@@ -83,6 +82,8 @@ public final class RecordAccumulator {
      *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
      * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
      *        exhausting all retries in a short period of time.
+     * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of
+     *        memory
      * @param metrics The metrics
      * @param time The time instance to use
      * @param metricTags additional key/value attributes of the metric
@@ -92,6 +93,7 @@ public final class RecordAccumulator {
                              CompressionType compression,
                              long lingerMs,
                              long retryBackoffMs,
+                             boolean blockOnBufferFull,
                              Metrics metrics,
                              Time time,
                              Map<String, String> metricTags) {
@@ -105,7 +107,7 @@ public final class RecordAccumulator {
         this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
         String metricGrpName = "producer-metrics";
-        this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags);
+        this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags);
         this.incomplete = new IncompleteRecordBatches();
         this.time = time;
         registerMetrics(metrics, metricGrpName, metricTags);
@@ -151,9 +153,8 @@ public final class RecordAccumulator {
      * @param key The key for the record
      * @param value The value for the record
      * @param callback The user-supplied callback to execute when the request is complete
-     * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
      */
-    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
+    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException {
         // We keep track of the number of appending thread to make sure we do not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
@@ -165,7 +166,7 @@ public final class RecordAccumulator {
             synchronized (dq) {
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
+                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
                     if (future != null)
                         return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                 }
@@ -174,14 +175,14 @@ public final class RecordAccumulator {
             // we don't have an in-progress record batch try to allocate a new batch
             int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
             log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
-            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
+            ByteBuffer buffer = free.allocate(size);
             synchronized (dq) {
                 // Need to check if producer is closed again after grabbing the dequeue lock.
                 if (closed)
                     throw new IllegalStateException("Cannot send after the producer is closed.");
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
+                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
                     if (future != null) {
                         // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                         free.deallocate(buffer);
@@ -190,7 +191,7 @@ public final class RecordAccumulator {
                 }
                 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
-                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));
+                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
 
                 dq.addLast(batch);
                 incomplete.add(batch);
@@ -202,51 +203,11 @@ public final class RecordAccumulator {
     }
 
     /**
-     * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
-     * due to metadata being unavailable
-     */
-    public List<RecordBatch> abortExpiredBatches(int requestTimeout, Cluster cluster, long now) {
-        List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
-        int count = 0;
-        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
-            TopicPartition topicAndPartition = entry.getKey();
-            Deque<RecordBatch> dq = entry.getValue();
-            synchronized (dq) {
-                // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut
-                Iterator<RecordBatch> batchIterator = dq.iterator();
-                while (batchIterator.hasNext()) {
-                    RecordBatch batch = batchIterator.next();
-                    Node leader = cluster.leaderFor(topicAndPartition);
-                    if (leader == null) {
-                        // check if the batch is expired
-                        if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
-                            expiredBatches.add(batch);
-                            count++;
-                            batchIterator.remove();
-                            deallocate(batch);
-                        } else {
-                            if (!batch.inRetry()) {
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        if (expiredBatches.size() > 0)
-            log.trace("Expired {} batches in accumulator", count);
-
-        return expiredBatches;
-    }
-
-    /**
      * Re-enqueue the given record batch in the accumulator to retry
      */
     public void reenqueue(RecordBatch batch, long now) {
         batch.attempts++;
         batch.lastAttemptMs = now;
-        batch.lastAppendTime = now;
-        batch.setRetry();
         Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
         synchronized (deque) {
             deque.addFirst(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 3f18582..06182db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -18,7 +18,6 @@ import java.util.List;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.slf4j.Logger;
@@ -42,9 +41,7 @@ public final class RecordBatch {
     public final MemoryRecords records;
     public final TopicPartition topicPartition;
     public final ProduceRequestResult produceFuture;
-    public long lastAppendTime;
     private final List<Thunk> thunks;
-    private boolean retry;
 
     public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
         this.createdMs = now;
@@ -53,8 +50,6 @@ public final class RecordBatch {
         this.topicPartition = tp;
         this.produceFuture = new ProduceRequestResult();
         this.thunks = new ArrayList<Thunk>();
-        this.lastAppendTime = createdMs;
-        this.retry = false;
     }
 
     /**
@@ -62,13 +57,12 @@ public final class RecordBatch {
      * 
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
-    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) {
+    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
         if (!this.records.hasRoomFor(key, value)) {
             return null;
         } else {
             this.records.append(0L, key, value);
             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
-            this.lastAppendTime = now;
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
             if (callback != null)
                 thunks.add(new Thunk(callback, future));
@@ -122,34 +116,4 @@ public final class RecordBatch {
     public String toString() {
         return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
     }
-
-    /**
-     * Expire the batch that is ready but is sitting in accumulator for more than requestTimeout due to metadata being unavailable.
-     * We need to explicitly check if the record is full or linger time is met because the accumulator's partition may not be ready
-     * if the leader is unavailable.
-     */
-    public boolean maybeExpire(int requestTimeout, long now, long lingerMs) {
-        boolean expire = false;
-        if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + lingerMs))) {
-            expire = true;
-            this.records.close();
-            this.done(-1L, new TimeoutException("Batch Expired"));
-        }
-
-        return expire;
-    }
-
-    /**
-     * Returns if the batch is been retried for sending to kafka
-     */
-    public boolean inRetry() {
-        return this.retry;
-    }
-
-    /**
-     * Set retry to true if the batch is being retried (for send)
-     */
-    public void setRetry() {
-        this.retry = true;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 134d45a..d2e64f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -71,6 +71,9 @@ public class Sender implements Runnable {
     /* the number of acknowledgements to request from the server */
     private final short acks;
 
+    /* the max time in ms for the server to wait for acknowlegements */
+    private final int requestTimeout;
+
     /* the number of times to retry a failed request before giving up */
     private final int retries;
 
@@ -89,30 +92,27 @@ public class Sender implements Runnable {
     /* param clientId of the client */
     private String clientId;
 
-    /* the max time to wait for the server to respond to the request*/
-    private final int requestTimeout;
-
     public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
                   int maxRequestSize,
                   short acks,
                   int retries,
+                  int requestTimeout,
                   Metrics metrics,
                   Time time,
-                  String clientId,
-                  int requestTimeout) {
+                  String clientId) {
         this.client = client;
         this.accumulator = accumulator;
         this.metadata = metadata;
         this.maxRequestSize = maxRequestSize;
         this.running = true;
+        this.requestTimeout = requestTimeout;
         this.acks = acks;
         this.retries = retries;
         this.time = time;
         this.clientId = clientId;
         this.sensors = new SenderMetrics(metrics);
-        this.requestTimeout = requestTimeout;
     }
 
     /**
@@ -187,12 +187,6 @@ public class Sender implements Runnable {
                                                                          result.readyNodes,
                                                                          this.maxRequestSize,
                                                                          now);
-
-        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
-        // update sensors
-        for (RecordBatch expiredBatch : expiredBatches)
-            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
-
         sensors.updateProduceRequestMetrics(batches);
         List<ClientRequest> requests = createProduceRequests(batches, now);
         // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
@@ -206,7 +200,7 @@ public class Sender implements Runnable {
             pollTimeout = 0;
         }
         for (ClientRequest request : requests)
-            client.send(request, now);
+            client.send(request);
 
         // if some partitions are already ready to be sent, the select time would be 0;
         // otherwise if some partition already has some data accumulated but not ready yet,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 629fa0d..70e74bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -38,6 +38,11 @@ public interface Selectable {
     public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
 
     /**
+     * Begin disconnecting the connection identified by the given id
+     */
+    public void disconnect(String id);
+
+    /**
      * Wakeup this selector if it is blocked on I/O
      */
     public void wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 9e52078..5a4909e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -177,6 +177,17 @@ public class Selector implements Selectable {
     }
 
     /**
+     * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
+     * processed until the next {@link #poll(long) poll()} call.
+     */
+    @Override
+    public void disconnect(String id) {
+        KafkaChannel channel = this.channels.get(id);
+        if (channel != null)
+            channel.disconnect();
+    }
+
+    /**
      * Interrupt the nioSelector if it is blocked waiting to do I/O.
      */
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index ee72328..e5815f5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -100,7 +100,7 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public void send(ClientRequest request, long now) {
+    public void send(ClientRequest request) {
         if (!futureResponses.isEmpty()) {
             FutureResponse futureResp = futureResponses.poll();
             if (!futureResp.requestMatcher.matches(request))
@@ -109,7 +109,6 @@ public class MockClient implements KafkaClient {
             ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
             responses.add(resp);
         } else {
-            request.setSendMs(now);
             this.requests.add(request);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 2379896..69c93c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -45,30 +45,42 @@ import org.junit.Test;
 
 public class NetworkClientTest {
 
-    private final int requestTimeoutMs = 1000;
     private MockTime time = new MockTime();
     private MockSelector selector = new MockSelector(time);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private int nodeId = 1;
     private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
     private Node node = cluster.nodes().get(0);
-    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs);
-
+    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024);
     private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-            "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs);
+            "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024);
 
     @Before
     public void setup() {
         metadata.update(cluster, time.milliseconds());
     }
 
+    @Test
+    public void testReadyAndDisconnect() {
+        assertFalse("Client begins unready as it has no connection.", client.ready(node, time.milliseconds()));
+        assertEquals("The connection is established as a side-effect of the readiness check", 1, selector.connected().size());
+        client.poll(1, time.milliseconds());
+        selector.clear();
+        assertTrue("Now the client is ready", client.ready(node, time.milliseconds()));
+        selector.disconnect(node.idString());
+        client.poll(1, time.milliseconds());
+        selector.clear();
+        assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
+        assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0);
+    }
+
     @Test(expected = IllegalStateException.class)
     public void testSendToUnreadyNode() {
         RequestSend send = new RequestSend("5",
                                            client.nextRequestHeader(ApiKeys.METADATA),
                                            new MetadataRequest(Arrays.asList("test")).toStruct());
         ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
-        client.send(request, time.milliseconds());
+        client.send(request);
         client.poll(1, time.milliseconds());
     }
 
@@ -93,7 +105,7 @@ public class NetworkClientTest {
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
         RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
         ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
-        client.send(request, time.milliseconds());
+        client.send(request);
         assertEquals("There should be 1 in-flight request after send", 1, client.inFlightRequestCount(node.idString()));
 
         client.close(node.idString());
@@ -108,7 +120,7 @@ public class NetworkClientTest {
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
         awaitReady(networkClient, node);
-        networkClient.send(request, time.milliseconds());
+        networkClient.send(request);
         networkClient.poll(1, time.milliseconds());
         assertEquals(1, networkClient.inFlightRequestCount());
         ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId());
@@ -131,31 +143,15 @@ public class NetworkClientTest {
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
     }
-
-    @Test
-    public void testRequestTimeout() {
-        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
-        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
-        RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
-        TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
-        awaitReady(client, node);
-        long now = time.milliseconds();
-        client.send(request, now);
-        // sleeping to make sure that the time since last send is greater than requestTimeOut
-        time.sleep(3000);
-        client.poll(3000, time.milliseconds());
-        String disconnectedNode = selector.disconnected().get(0);
-        assertEquals(node.idString(), disconnectedNode);
-    }
-
+    
     private static class TestCallbackHandler implements RequestCompletionHandler {
         public boolean executed = false;
         public ClientResponse response;
-
+        
         public void onComplete(ClientResponse response) {
             this.executed = true;
             this.response = response;
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index ded5d3e..2c69382 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.clients.producer.BufferExhaustedException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -35,7 +35,6 @@ import static org.junit.Assert.*;
 public class BufferPoolTest {
     private MockTime time = new MockTime();
     private Metrics metrics = new Metrics(time);
-    private final long maxBlockTimeMs =  2000;
     String metricGroup = "TestMetrics";
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
 
@@ -46,8 +45,8 @@ public class BufferPoolTest {
     public void testSimple() throws Exception {
         long totalMemory = 64 * 1024;
         int size = 1024;
-        BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs);
+        BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(size);
         assertEquals("Buffer size should equal requested size.", size, buffer.limit());
         assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
         assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
@@ -56,13 +55,13 @@ public class BufferPoolTest {
         pool.deallocate(buffer);
         assertEquals("All memory should be available", totalMemory, pool.availableMemory());
         assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
-        buffer = pool.allocate(size, maxBlockTimeMs);
+        buffer = pool.allocate(size);
         assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
         assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
         pool.deallocate(buffer);
         assertEquals("All memory should be available", totalMemory, pool.availableMemory());
         assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
-        buffer = pool.allocate(2 * size, maxBlockTimeMs);
+        buffer = pool.allocate(2 * size);
         pool.deallocate(buffer);
         assertEquals("All memory should be available", totalMemory, pool.availableMemory());
         assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
@@ -73,11 +72,23 @@ public class BufferPoolTest {
      */
     @Test(expected = IllegalArgumentException.class)
     public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
-        BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
+        BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(1024);
         assertEquals(1024, buffer.limit());
         pool.deallocate(buffer);
-        buffer = pool.allocate(1025, maxBlockTimeMs);
+        buffer = pool.allocate(1025);
+    }
+
+    @Test
+    public void testNonblockingMode() throws Exception {
+        BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags);
+        pool.allocate(1);
+        try {
+            pool.allocate(2);
+            fail("The buffer allocated more than it has!");
+        } catch (BufferExhaustedException e) {
+            // this is good
+        }
     }
 
     /**
@@ -85,8 +96,8 @@ public class BufferPoolTest {
      */
     @Test
     public void testDelayedAllocation() throws Exception {
-        BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
+        BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(1024);
         CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
         CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
         assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
@@ -115,7 +126,7 @@ public class BufferPoolTest {
         Thread thread = new Thread() {
             public void run() {
                 try {
-                    pool.allocate(size, maxBlockTimeMs);
+                    pool.allocate(size);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 } finally {
@@ -128,23 +139,6 @@ public class BufferPoolTest {
     }
 
     /**
-     * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testBlockTimeout() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup, metricTags);
-        pool.allocate(1, maxBlockTimeMs);
-        try {
-            pool.allocate(2, maxBlockTimeMs);
-            fail("The buffer allocated more memory than its maximum value 2");
-        } catch (TimeoutException e) {
-            // this is good
-        }
-    }
-
-    /**
      * This test creates lots of threads that hammer on the pool
      */
     @Test
@@ -153,7 +147,7 @@ public class BufferPoolTest {
         final int iterations = 50000;
         final int poolableSize = 1024;
         final long totalMemory = numThreads / 2 * poolableSize;
-        final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup, metricTags);
+        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags);
         List<StressTestThread> threads = new ArrayList<StressTestThread>();
         for (int i = 0; i < numThreads; i++)
             threads.add(new StressTestThread(pool, iterations));
@@ -169,7 +163,6 @@ public class BufferPoolTest {
     public static class StressTestThread extends Thread {
         private final int iterations;
         private final BufferPool pool;
-        private final long maxBlockTimeMs =  2000;
         public final AtomicBoolean success = new AtomicBoolean(false);
 
         public StressTestThread(BufferPool pool, int iterations) {
@@ -187,7 +180,7 @@ public class BufferPoolTest {
                     else
                         // allocate a random size
                         size = TestUtils.RANDOM.nextInt((int) pool.totalMemory());
-                    ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs);
+                    ByteBuffer buffer = pool.allocate(size);
                     pool.deallocate(buffer);
                 }
                 success.set(true);


Mime
View raw message