kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5886; Introduce delivery.timeout.ms producer config (KIP-91) (#5270)
Date Thu, 26 Jul 2018 16:14:01 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7fc7136  KAFKA-5886; Introduce delivery.timeout.ms producer config (KIP-91) (#5270)
7fc7136 is described below

commit 7fc7136ffd28245629f00870fc7f7cbe1fe1c6ce
Author: Yu Yang <yuyang08@gmail.com>
AuthorDate: Thu Jul 26 09:13:50 2018 -0700

    KAFKA-5886; Introduce delivery.timeout.ms producer config (KIP-91) (#5270)
    
    Co-authored-by: Sumant Tambe <sutambe@yahoo.com>
    Co-authored-by: Yu Yang <yuyang@pinterest.com>
    
    Reviewers: Ted Yu <yuzhihong@gmail.com>, Apurva Mehta <apurva@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/producer/KafkaProducer.java      |  58 ++--
 .../kafka/clients/producer/ProducerConfig.java     |  24 +-
 .../clients/producer/internals/ProducerBatch.java  |  95 +++---
 .../producer/internals/RecordAccumulator.java      | 318 +++++++++++----------
 .../kafka/clients/producer/internals/Sender.java   | 188 ++++++++----
 .../apache/kafka/common/config/AbstractConfig.java |   2 +-
 .../org/apache/kafka/common/config/ConfigDef.java  |  11 +-
 .../java/org/apache/kafka/clients/MockClient.java  |   5 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   4 +-
 .../producer/internals/ProducerBatchTest.java      |  32 +--
 .../producer/internals/RecordAccumulatorTest.java  | 248 ++++++++++++----
 .../clients/producer/internals/SenderTest.java     | 291 +++++++++++++++----
 .../producer/internals/TransactionManagerTest.java |   6 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |   1 +
 .../kafka/api/BaseProducerSendTest.scala           |  12 +-
 .../kafka/api/PlaintextConsumerTest.scala          |   4 +-
 .../kafka/api/PlaintextProducerSendTest.scala      |   6 +-
 .../kafka/api/ProducerFailureHandlingTest.scala    |   6 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   4 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   9 +-
 docs/upgrade.html                                  |   5 +
 22 files changed, 900 insertions(+), 431 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3991467..b40b09a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -16,6 +16,17 @@
  */
 package org.apache.kafka.clients.producer;
 
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.KafkaClient;
@@ -24,6 +35,7 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.producer.internals.BufferPool;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.ProducerMetrics;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@@ -69,18 +81,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
 
 /**
@@ -235,6 +235,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.producer";
     public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
+    public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
 
     private final String clientId;
     // Visible for testing
@@ -392,18 +393,21 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             int retries = configureRetries(config, transactionManager != null, log);
             int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
             short acks = configureAcks(config, transactionManager != null, log);
+            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
 
             this.apiVersions = new ApiVersions();
             this.accumulator = new RecordAccumulator(logContext,
                     config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
-                    this.totalMemorySize,
                     this.compressionType,
-                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
+                    config.getInt(ProducerConfig.LINGER_MS_CONFIG),
                     retryBackoffMs,
+                    deliveryTimeoutMs,
                     metrics,
+                    PRODUCER_METRIC_GROUP_NAME,
                     time,
                     apiVersions,
-                    transactionManager);
+                    transactionManager,
+                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             if (metadata != null) {
                 this.metadata = metadata;
@@ -459,10 +463,30 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
+    private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
+        int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
+        int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
+        int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
+        if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) {
+            if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
+                // throw an exception if the user explicitly set an inconsistent value
+                throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
+                    + " should be equal to or larger than " + ProducerConfig.LINGER_MS_CONFIG
+                    + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            } else {
+                // override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility
+                deliveryTimeoutMs = lingerMs + requestTimeoutMs;
+                log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.",
+                    ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG,
+                    ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
+            }
+        }
+        return deliveryTimeoutMs;
+    }
 
+    private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
         TransactionManager transactionManager = null;
-
         boolean userConfiguredIdempotence = false;
         if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
             userConfiguredIdempotence = true;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 8e7b662..ab55353 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -99,6 +99,19 @@ public class ProducerConfig extends AbstractConfig {
                                                 + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, "
                                                 + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";
 
+    /** <code>request.timeout.ms</code> */
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
+        + " This should be larger than replica.lag.time.max.ms (a broker configuration)"
+        + " to reduce the possibility of message duplication due to unnecessary producer retries.";
+
+    /** <code>delivery.timeout.ms</code> */
+    public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms";
+    private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on the time to report success or failure after Producer.send() returns. "
+                                                          + "Producer may report failure to send a message earlier than this config if all the retries are exhausted or "
+                                                          + "a record is added to a batch nearing expiration. " + DELIVERY_TIMEOUT_MS_CONFIG + "should be equal to or "
+                                                          + "greater than " + REQUEST_TIMEOUT_MS_CONFIG + " + " + LINGER_MS_CONFIG;
+
     /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
 
@@ -188,12 +201,6 @@ public class ProducerConfig extends AbstractConfig {
     public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
     private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> interface.";
 
-    /** <code>request.timeout.ms</code> */
-    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
-    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
-                                                        + " This should be larger than replica.lag.time.max.ms (a broker configuration)"
-                                                        + " to reduce the possibility of message duplication due to unnecessary producer retries.";
-
     /** <code>interceptor.classes</code> */
     public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
     public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
@@ -224,7 +231,7 @@ public class ProducerConfig extends AbstractConfig {
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
-                                .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
+                                .define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
                                 .define(ACKS_CONFIG,
                                         Type.STRING,
                                         "1",
@@ -233,7 +240,8 @@ public class ProducerConfig extends AbstractConfig {
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
-                                .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
+                                .define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
+                                .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
                                 .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index ea0f0f7..6e08185 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
@@ -77,13 +76,13 @@ public final class ProducerBatch {
     private boolean retry;
     private boolean reopened = false;
 
-    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
-        this(tp, recordsBuilder, now, false);
+    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs) {
+        this(tp, recordsBuilder, createdMs, false);
     }
 
-    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now, boolean isSplitBatch) {
-        this.createdMs = now;
-        this.lastAttemptMs = now;
+    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
+        this.createdMs = createdMs;
+        this.lastAttemptMs = createdMs;
         this.recordsBuilder = recordsBuilder;
         this.topicPartition = tp;
         this.lastAppendTime = createdMs;
@@ -158,7 +157,17 @@ public final class ProducerBatch {
     }
 
     /**
-     * Complete the request. If the batch was previously aborted, this is a no-op.
+     * Finalize the state of a batch. Final state, once set, is immutable. This function may be called
+     * once or twice on a batch. It may be called twice if
+     * 1. An inflight batch expires before a response from the broker is received. The batch's final
+     * state is set to FAILED. But it could succeed on the broker and second time around batch.done() may
+     * try to set SUCCEEDED final state.
+     * 2. If a transaction abortion happens or if the producer is closed forcefully, the final state is
+     * ABORTED but again it could succeed if broker responds with a success.
+     *
+     * Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged.
+     * Attempted transitions from one failure state to the same or a different failed state are ignored.
+     * Attempted transitions from SUCCEEDED to the same or a failed state throw an exception.
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
@@ -166,26 +175,34 @@ public final class ProducerBatch {
      * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
     public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        final FinalState finalState;
-        if (exception == null) {
+        final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
+
+        if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
-            finalState = FinalState.SUCCEEDED;
         } else {
-            log.trace("Failed to produce messages to {}.", topicPartition, exception);
-            finalState = FinalState.FAILED;
+            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
+        }
+
+        if (this.finalState.compareAndSet(null, tryFinalState)) {
+            completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+            return true;
         }
 
-        if (!this.finalState.compareAndSet(null, finalState)) {
-            if (this.finalState.get() == FinalState.ABORTED) {
-                log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition);
-                return false;
+        if (this.finalState.get() != FinalState.SUCCEEDED) {
+            if (tryFinalState == FinalState.SUCCEEDED) {
+                // Log if a previously unsuccessful batch succeeded later on.
+                log.debug("ProduceResponse returned {} for {} after batch with base offset {} had already been {}.",
+                    tryFinalState, topicPartition, baseOffset, this.finalState.get());
             } else {
-                throw new IllegalStateException("Batch has already been completed in final state " + this.finalState.get());
+                // FAILED --> FAILED and ABORTED --> FAILED transitions are ignored.
+                log.debug("Ignored state transition {} -> {} for {} batch with base offset {}",
+                    this.finalState.get(), tryFinalState, topicPartition, baseOffset);
             }
+        } else {
+            // A SUCCESSFUL batch must not attempt another state change.
+            throw new IllegalStateException("A " + this.finalState.get() + " batch must not attempt another state change to " + tryFinalState);
         }
-
-        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
-        return true;
+        return false;
     }
 
     private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
@@ -299,37 +316,12 @@ public final class ProducerBatch {
         return "ProducerBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
     }
 
-    /**
-     * A batch whose metadata is not available should be expired if one of the following is true:
-     * <ol>
-     *     <li> the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached).
-     *     <li> the batch is in retry AND request timeout has elapsed after the backoff period ended.
-     * </ol>
-     * This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out.
-     */
-    boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
-        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
-            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
-        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
-            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
-        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
-            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
-
-        boolean expired = expiryErrorMessage != null;
-        if (expired)
-            abortRecordAppends();
-        return expired;
+    boolean hasReachedDeliveryTimeout(long deliveryTimeoutMs, long now) {
+        return deliveryTimeoutMs <= now - this.createdMs;
     }
 
-    /**
-     * If {@link #maybeExpire(int, long, long, long, boolean)} returned true, the sender will fail the batch with
-     * the exception returned by this method.
-     * @return An exception indicating the batch expired.
-     */
-    TimeoutException timeoutException() {
-        if (expiryErrorMessage == null)
-            throw new IllegalStateException("Batch has not expired");
-        return new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage);
+    public FinalState finalState() {
+        return this.finalState.get();
     }
 
     int attempts() {
@@ -347,10 +339,6 @@ public final class ProducerBatch {
         return drainedMs - createdMs;
     }
 
-    long createdTimeMs(long nowMs) {
-        return Math.max(0, nowMs - createdMs);
-    }
-
     long waitedTimeMs(long nowMs) {
         return Math.max(0, nowMs - lastAttemptMs);
     }
@@ -467,5 +455,4 @@ public final class ProducerBatch {
     public boolean sequenceHasBeenReset() {
         return reopened;
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 31c6d75..964ac3c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -16,6 +16,18 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
@@ -34,10 +46,10 @@ import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.LogContext;
@@ -45,20 +57,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * This class acts as a queue that accumulates records into {@link MemoryRecords}
  * instances to be sent to the server.
@@ -76,6 +74,7 @@ public final class RecordAccumulator {
     private final CompressionType compression;
     private final long lingerMs;
     private final long retryBackoffMs;
+    private final long deliveryTimeoutMs;
     private final BufferPool free;
     private final Time time;
     private final ApiVersions apiVersions;
@@ -85,13 +84,13 @@ public final class RecordAccumulator {
     private final Map<TopicPartition, Long> muted;
     private int drainIndex;
     private final TransactionManager transactionManager;
+    private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.
 
     /**
      * Create a new record accumulator
      *
      * @param logContext The log context used for logging
      * @param batchSize The size to use when allocating {@link MemoryRecords} instances
-     * @param totalSize The maximum memory the record accumulator can use.
      * @param compression The compression codec for the records
      * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
      *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
@@ -106,14 +105,16 @@ public final class RecordAccumulator {
      */
     public RecordAccumulator(LogContext logContext,
                              int batchSize,
-                             long totalSize,
                              CompressionType compression,
                              long lingerMs,
                              long retryBackoffMs,
+                             long deliveryTimeoutMs,
                              Metrics metrics,
+                             String metricGrpName,
                              Time time,
                              ApiVersions apiVersions,
-                             TransactionManager transactionManager) {
+                             TransactionManager transactionManager,
+                             BufferPool bufferPool) {
         this.log = logContext.logger(RecordAccumulator.class);
         this.drainIndex = 0;
         this.closed = false;
@@ -123,9 +124,9 @@ public final class RecordAccumulator {
         this.compression = compression;
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
+        this.deliveryTimeoutMs = deliveryTimeoutMs;
         this.batches = new CopyOnWriteMap<>();
-        String metricGrpName = "producer-metrics";
-        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
+        this.free = bufferPool;
         this.incomplete = new IncompleteBatches();
         this.muted = new HashMap<>();
         this.time = time;
@@ -227,7 +228,6 @@ public final class RecordAccumulator {
 
                 // Don't deallocate this buffer in the finally block as it's being used in the record batch
                 buffer = null;
-
                 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
             }
         } finally {
@@ -240,7 +240,7 @@ public final class RecordAccumulator {
     private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
         if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
             throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
-                    "support the required message format (v2). The broker must be version 0.11 or later.");
+                "support the required message format (v2). The broker must be version 0.11 or later.");
         }
         return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
     }
@@ -273,37 +273,35 @@ public final class RecordAccumulator {
         return result;
     }
 
+    public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) {
+        if (batch.createdMs + deliveryTimeoutMs  > 0) {
+            // the non-negative check is to guard us against potential overflow due to setting
+            // a large value for deliveryTimeoutMs
+            nextBatchExpiryTimeMs = Math.min(nextBatchExpiryTimeMs, batch.createdMs + deliveryTimeoutMs);
+        } else {
+            log.warn("Skipping next batch expiry time update due to addition overflow: "
+                + "batch.createMs={}, deliveryTimeoutMs={}", batch.createdMs, deliveryTimeoutMs);
+        }
+    }
+
     /**
      * Get a list of batches which have been sitting in the accumulator too long and need to be expired.
      */
-    public List<ProducerBatch> expiredBatches(int requestTimeout, long now) {
+    public List<ProducerBatch> expiredBatches(long now) {
         List<ProducerBatch> expiredBatches = new ArrayList<>();
         for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
-            Deque<ProducerBatch> dq = entry.getValue();
-            TopicPartition tp = entry.getKey();
-            // We only check if the batch should be expired if the partition does not have a batch in flight.
-            // This is to prevent later batches from being expired while an earlier batch is still in progress.
-            // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection
-            // is only active in this case. Otherwise the expiration order is not guaranteed.
-            if (!isMuted(tp, now)) {
-                synchronized (dq) {
-                    // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
-                    ProducerBatch lastBatch = dq.peekLast();
-                    Iterator<ProducerBatch> batchIterator = dq.iterator();
-                    while (batchIterator.hasNext()) {
-                        ProducerBatch batch = batchIterator.next();
-                        boolean isFull = batch != lastBatch || batch.isFull();
-                        // Check if the batch has expired. Expired batches are closed by maybeExpire, but callbacks
-                        // are invoked after completing the iterations, since sends invoked from callbacks
-                        // may append more batches to the deque being iterated. The batch is deallocated after
-                        // callbacks are invoked.
-                        if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
-                            expiredBatches.add(batch);
-                            batchIterator.remove();
-                        } else {
-                            // Stop at the first batch that has not expired.
-                            break;
-                        }
+            // expire the batches in the order of sending
+            Deque<ProducerBatch> deque = entry.getValue();
+            synchronized (deque) {
+                while (!deque.isEmpty()) {
+                    ProducerBatch batch = deque.getFirst();
+                    if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) {
+                        deque.poll();
+                        batch.abortRecordAppends();
+                        expiredBatches.add(batch);
+                    } else {
+                        maybeUpdateNextBatchExpiryTime(batch);
+                        break;
                     }
                 }
             }
@@ -311,8 +309,13 @@ public final class RecordAccumulator {
         return expiredBatches;
     }
 
+    public long getDeliveryTimeoutMs() {
+        return deliveryTimeoutMs;
+    }
+
     /**
-     * Re-enqueue the given record batch in the accumulator to retry
+     * Re-enqueue the given record batch in the accumulator. In Sender.completeBatch method, we check
+     * whether the batch has reached deliveryTimeoutMs or not. Hence we do not do the delivery timeout check here.
      */
     public void reenqueue(ProducerBatch batch, long now) {
         batch.reenqueued(now);
@@ -356,8 +359,8 @@ public final class RecordAccumulator {
     }
 
     // We will have to do extra work to ensure the queue is in order when requests are being retried and there are
-    // multiple requests in flight to that partition. If the first inflight request fails to append, then all the subsequent
-    // in flight requests will also fail because the sequence numbers will not be accepted.
+    // multiple requests in flight to that partition. If the first in flight request fails to append, then all the
+    // subsequent in flight requests will also fail because the sequence numbers will not be accepted.
     //
     // Further, once batches are being retried, we are reduced to a single in flight request for that partition. So when
     // the subsequent batches come back in sequence order, they will have to be placed further back in the queue.
@@ -368,12 +371,12 @@ public final class RecordAccumulator {
     private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) {
         // When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence.
         if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
-            throw new IllegalStateException("Trying to reenqueue a batch which doesn't have a sequence even " +
-                    "though idempotence is enabled.");
+            throw new IllegalStateException("Trying to re-enqueue a batch which doesn't have a sequence even " +
+                "though idempotency is enabled.");
 
         if (transactionManager.nextBatchBySequence(batch.topicPartition) == null)
-            throw new IllegalStateException("We are reenqueueing a batch which is not tracked as part of the in flight " +
-                    "requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence());
+            throw new IllegalStateException("We are re-enqueueing a batch which is not tracked as part of the in flight " +
+                "requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence());
 
         ProducerBatch firstBatchInQueue = deque.peekFirst();
         if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) {
@@ -390,7 +393,7 @@ public final class RecordAccumulator {
                 orderedBatches.add(deque.pollFirst());
 
             log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " +
-                    "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size());
+                "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size());
             // Either we have reached a point where there are batches without a sequence (ie. never been drained
             // and are hence in order by default), or the batch at the front of the queue has a sequence greater
             // than the incoming batch. This is the right place to add the incoming batch.
@@ -466,7 +469,6 @@ public final class RecordAccumulator {
                 }
             }
         }
-
         return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
     }
 
@@ -484,6 +486,106 @@ public final class RecordAccumulator {
         return false;
     }
 
+    private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) {
+        ProducerIdAndEpoch producerIdAndEpoch = null;
+        if (transactionManager != null) {
+            if (!transactionManager.isSendToPartitionAllowed(tp))
+                return true;
+
+            producerIdAndEpoch = transactionManager.producerIdAndEpoch();
+            if (!producerIdAndEpoch.isValid())
+                // we cannot send the batch until we have refreshed the producer id
+                return true;
+
+            if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))
+                // Don't drain any new batches while the state of previous sequence numbers
+                // is unknown. The previous batches would be unknown if they were aborted
+                // on the client after being sent to the broker at least once.
+                return true;
+
+            int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
+            if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
+                && first.baseSequence() != firstInFlightSequence)
+                // If the queued batch already has an assigned sequence, then it is being retried.
+                // In this case, we wait until the next immediate batch is ready and drain that.
+                // We only move on when the next in line batch is complete (either successfully or due to
+                // a fatal broker error). This effectively reduces our in flight request count to 1.
+                return true;
+        }
+        return false;
+    }
+
+    private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
+        int size = 0;
+        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
+        List<ProducerBatch> ready = new ArrayList<>();
+        /* to make starvation less likely this loop doesn't start at 0 */
+        int start = drainIndex = drainIndex % parts.size();
+        do {
+            PartitionInfo part = parts.get(drainIndex);
+            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
+            this.drainIndex = (this.drainIndex + 1) % parts.size();
+
+            // Only proceed if the partition has no in-flight batches.
+            if (isMuted(tp, now))
+                continue;
+
+            Deque<ProducerBatch> deque = getDeque(tp);
+            if (deque == null)
+                continue;
+
+            synchronized (deque) {
+                // invariant: !isMuted(tp,now) && deque != null
+                ProducerBatch first = deque.peekFirst();
+                if (first == null)
+                    continue;
+
+                // first != null
+                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
+                // Only drain the batch if it is not during backoff period.
+                if (backoff)
+                    continue;
+
+                if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
+                    // there is a rare case that a single batch size is larger than the request size due to
+                    // compression; in this case we will still eventually send this batch in a single request
+                    break;
+                } else {
+                    if (shouldStopDrainBatchesForPartition(first, tp))
+                        break;
+
+                    boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
+                    ProducerIdAndEpoch producerIdAndEpoch =
+                        transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
+                    ProducerBatch batch = deque.pollFirst();
+                    if (producerIdAndEpoch != null && !batch.hasSequence()) {
+                        // If the batch already has an assigned sequence, then we should not change the producer id and
+                        // sequence number, since this may introduce duplicates. In particular, the previous attempt
+                        // may actually have been accepted, and if we change the producer id and sequence here, this
+                        // attempt will also be accepted, causing a duplicate.
+                        //
+                        // Additionally, we update the next sequence number bound for the partition, and also have
+                        // the transaction manager track the batch so as to ensure that sequence ordering is maintained
+                        // even if we receive out of order responses.
+                        batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+                        transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
+                        log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
+                                "{} being sent to partition {}", producerIdAndEpoch.producerId,
+                            producerIdAndEpoch.epoch, batch.baseSequence(), tp);
+
+                        transactionManager.addInFlightBatch(batch);
+                    }
+                    batch.close();
+                    size += batch.records().sizeInBytes();
+                    ready.add(batch);
+
+                    batch.drained(now);
+                }
+            }
+        } while (start != drainIndex);
+        return ready;
+    }
+
     /**
      * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
      * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
@@ -494,106 +596,25 @@ public final class RecordAccumulator {
      * @param now The current unix time in milliseconds
      * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
      */
-    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
-                                                   Set<Node> nodes,
-                                                   int maxSize,
-                                                   long now) {
+    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
         if (nodes.isEmpty())
             return Collections.emptyMap();
 
         Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
         for (Node node : nodes) {
-            int size = 0;
-            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
-            List<ProducerBatch> ready = new ArrayList<>();
-            /* to make starvation less likely this loop doesn't start at 0 */
-            int start = drainIndex = drainIndex % parts.size();
-            do {
-                PartitionInfo part = parts.get(drainIndex);
-                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
-                // Only proceed if the partition has no in-flight batches.
-                if (!isMuted(tp, now)) {
-                    Deque<ProducerBatch> deque = getDeque(tp);
-                    if (deque != null) {
-                        synchronized (deque) {
-                            ProducerBatch first = deque.peekFirst();
-                            if (first != null) {
-                                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
-                                // Only drain the batch if it is not during backoff period.
-                                if (!backoff) {
-                                    if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
-                                        // there is a rare case that a single batch size is larger than the request size due
-                                        // to compression; in this case we will still eventually send this batch in a single
-                                        // request
-                                        break;
-                                    } else {
-                                        ProducerIdAndEpoch producerIdAndEpoch = null;
-                                        boolean isTransactional = false;
-                                        if (transactionManager != null) {
-                                            if (!transactionManager.isSendToPartitionAllowed(tp))
-                                                break;
-
-                                            producerIdAndEpoch = transactionManager.producerIdAndEpoch();
-                                            if (!producerIdAndEpoch.isValid())
-                                                // we cannot send the batch until we have refreshed the producer id
-                                                break;
-
-                                            isTransactional = transactionManager.isTransactional();
-
-                                            if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))
-                                                // Don't drain any new batches while the state of previous sequence numbers
-                                                // is unknown. The previous batches would be unknown if they were aborted
-                                                // on the client after being sent to the broker at least once.
-                                                break;
-
-                                            int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
-                                            if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
-                                                    && first.baseSequence() != firstInFlightSequence)
-                                                // If the queued batch already has an assigned sequence, then it is being
-                                                // retried. In this case, we wait until the next immediate batch is ready
-                                                // and drain that. We only move on when the next in line batch is complete (either successfully
-                                                // or due to a fatal broker error). This effectively reduces our
-                                                // in flight request count to 1.
-                                                break;
-                                        }
-
-                                        ProducerBatch batch = deque.pollFirst();
-                                        if (producerIdAndEpoch != null && !batch.hasSequence()) {
-                                            // If the batch already has an assigned sequence, then we should not change the producer id and
-                                            // sequence number, since this may introduce duplicates. In particular,
-                                            // the previous attempt may actually have been accepted, and if we change
-                                            // the producer id and sequence here, this attempt will also be accepted,
-                                            // causing a duplicate.
-                                            //
-                                            // Additionally, we update the next sequence number bound for the partition,
-                                            // and also have the transaction manager track the batch so as to ensure
-                                            // that sequence ordering is maintained even if we receive out of order
-                                            // responses.
-                                            batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-                                            transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
-                                            log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
-                                                            "{} being sent to partition {}", producerIdAndEpoch.producerId,
-                                                    producerIdAndEpoch.epoch, batch.baseSequence(), tp);
-
-                                            transactionManager.addInFlightBatch(batch);
-                                        }
-                                        batch.close();
-                                        size += batch.records().sizeInBytes();
-                                        ready.add(batch);
-                                        batch.drained(now);
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-                this.drainIndex = (this.drainIndex + 1) % parts.size();
-            } while (start != drainIndex);
+            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
             batches.put(node.id(), ready);
         }
         return batches;
     }
 
+    /**
+     * The earliest absolute time a batch will expire (in milliseconds)
+     */
+    public Long nextExpiryTimeMs() {
+        return this.nextBatchExpiryTimeMs;
+    }
+
     private Deque<ProducerBatch> getDeque(TopicPartition tp) {
         return batches.get(tp);
     }
@@ -784,5 +805,4 @@ public final class RecordAccumulator {
             this.unknownLeaderTopics = unknownLeaderTopics;
         }
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index d1a7bc9..7077f15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.util.ArrayList;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
@@ -34,6 +35,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -120,6 +122,9 @@ public class Sender implements Runnable {
     /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
     private final TransactionManager transactionManager;
 
+    // A per-partition queue of batches ordered by creation time for tracking the in-flight batches
+    private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
+
     public Sender(LogContext logContext,
                   KafkaClient client,
                   Metadata metadata,
@@ -149,6 +154,73 @@ public class Sender implements Runnable {
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
         this.transactionManager = transactionManager;
+        this.inFlightBatches = new HashMap<>();
+    }
+
+    public List<ProducerBatch> inFlightBatches(TopicPartition tp) {
+        return inFlightBatches.containsKey(tp) ? inFlightBatches.get(tp) : new ArrayList<>();
+    }
+
+    public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
+        List<ProducerBatch> batches = inFlightBatches.get(batch.topicPartition);
+        if (batches != null) {
+            batches.remove(batch);
+            if (batches.isEmpty()) {
+                inFlightBatches.remove(batch.topicPartition);
+            }
+        }
+    }
+
+    /**
+     *  Get the in-flight batches that has reached delivery timeout.
+     */
+    private List<ProducerBatch> getExpiredInflightBatches(long now) {
+        List<ProducerBatch> expiredBatches = new ArrayList<>();
+        for (Map.Entry<TopicPartition, List<ProducerBatch>> entry : inFlightBatches.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            List<ProducerBatch> partitionInFlightBatches = entry.getValue();
+            if (partitionInFlightBatches != null) {
+                Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator();
+                while (iter.hasNext()) {
+                    ProducerBatch batch = iter.next();
+                    if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
+                        iter.remove();
+                        // expireBatches is called in Sender.sendProducerData, before client.poll.
+                        // The batch.finalState() == null invariant should always hold. An IllegalStateException
+                        // exception will be thrown if the invariant is violated.
+                        if (batch.finalState() == null) {
+                            expiredBatches.add(batch);
+                        } else {
+                            throw new IllegalStateException(batch.topicPartition + " batch created at " +
+                                batch.createdMs + " gets unexpected final state " + batch.finalState());
+                        }
+                    } else {
+                        accumulator.maybeUpdateNextBatchExpiryTime(batch);
+                        break;
+                    }
+                }
+                if (partitionInFlightBatches.isEmpty())
+                    inFlightBatches.remove(topicPartition);
+            }
+        }
+        return expiredBatches;
+    }
+
+    private void addToInflightBatches(List<ProducerBatch> batches) {
+        for (ProducerBatch batch : batches) {
+            List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition);
+            if (inflightBatchList == null) {
+                inflightBatchList = new ArrayList<>();
+                inFlightBatches.put(batch.topicPartition, inflightBatchList);
+            }
+            inflightBatchList.add(batch);
+        }
+    }
+
+    public void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches) {
+        for (List<ProducerBatch> batchList : batches.values()) {
+            addToInflightBatches(batchList);
+        }
     }
 
     /**
@@ -204,12 +276,12 @@ public class Sender implements Runnable {
                 if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                     // Check if the previous run expired batches which requires a reset of the producer state.
                     transactionManager.resetProducerId();
-
                 if (!transactionManager.isTransactional()) {
                     // this is an idempotent producer, so make sure we have a producer id
                     maybeWaitForProducerId();
                 } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
-                    transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
+                    transactionManager.transitionToFatalError(
+                        new KafkaException("The client hasn't received acknowledgment for " +
                             "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                 } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                     // as long as there are outstanding transactional requests, we simply wait for them to return
@@ -241,7 +313,6 @@ public class Sender implements Runnable {
 
     private long sendProducerData(long now) {
         Cluster cluster = metadata.fetch();
-
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
@@ -253,8 +324,8 @@ public class Sender implements Runnable {
             for (String topic : result.unknownLeaderTopics)
                 this.metadata.add(topic);
 
-            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
-
+            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
+                result.unknownLeaderTopics);
             this.metadata.requestUpdate();
         }
 
@@ -270,8 +341,8 @@ public class Sender implements Runnable {
         }
 
         // create produce requests
-        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
-                this.maxRequestSize, now);
+        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
+        addToInflightBatches(batches);
         if (guaranteeMessageOrder) {
             // Mute all the partitions drained
             for (List<ProducerBatch> batchList : batches.values()) {
@@ -280,27 +351,34 @@ public class Sender implements Runnable {
             }
         }
 
-        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeoutMs, now);
+        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
+        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
+        expiredBatches.addAll(expiredInflightBatches);
+
         // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
         // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
         // we need to reset the producer id here.
         if (!expiredBatches.isEmpty())
             log.trace("Expired {} batches in accumulator", expiredBatches.size());
         for (ProducerBatch expiredBatch : expiredBatches) {
-            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
+            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
+            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
             if (transactionManager != null && expiredBatch.inRetry()) {
                 // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                 transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
             }
         }
-
         sensors.updateProduceRequestMetrics(batches);
 
         // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
-        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
-        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
-        // with sendable data that aren't ready to send since they would cause busy looping.
+        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
+        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
+        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
+        // that aren't ready to send since they would cause busy looping.
         long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
+        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
+        pollTimeout = Math.max(pollTimeout, 0);
         if (!result.readyNodes.isEmpty()) {
             log.trace("Nodes with data ready to send: {}", result.readyNodes);
             // if some partitions are already ready to be sent, the select time would be 0;
@@ -310,7 +388,6 @@ public class Sender implements Runnable {
             pollTimeout = 0;
         }
         sendProduceRequests(batches, now);
-
         return pollTimeout;
     }
 
@@ -318,7 +395,6 @@ public class Sender implements Runnable {
         if (transactionManager.isCompleting() && accumulator.hasIncomplete()) {
             if (transactionManager.isAborting())
                 accumulator.abortUndrainedBatches(new KafkaException("Failing batch since transaction was aborted"));
-
             // There may still be requests left which are being retried. Since we do not know whether they had
             // been successfully appended to the broker log, we must resend them until their final status is clear.
             // If they had been appended and we did not receive the error, then our sequence number would no longer
@@ -341,7 +417,6 @@ public class Sender implements Runnable {
                         transactionManager.lookupCoordinator(nextRequestHandler);
                         break;
                     }
-
                     if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeoutMs)) {
                         transactionManager.lookupCoordinator(nextRequestHandler);
                         break;
@@ -353,12 +428,10 @@ public class Sender implements Runnable {
                 if (targetNode != null) {
                     if (nextRequestHandler.isRetry())
                         time.sleep(nextRequestHandler.retryBackoffMs());
-
-                    ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
-                            requestBuilder, now, true, requestTimeoutMs, nextRequestHandler);
+                    ClientRequest clientRequest = client.newClientRequest(
+                        targetNode.idString(), requestBuilder, now, true, requestTimeoutMs, nextRequestHandler);
                     transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId());
                     log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
-
                     client.send(clientRequest, now);
                     return true;
                 }
@@ -371,11 +444,9 @@ public class Sender implements Runnable {
                     break;
                 }
             }
-
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }
-
         transactionManager.retry(nextRequestHandler);
         return true;
     }
@@ -442,8 +513,7 @@ public class Sender implements Runnable {
                         break;
                     }
                 } else {
-                    log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
-                            "We will back off and try again.");
+                    log.debug("Could not find an available broker to send InitProducerIdRequest to. Will back off and retry.");
                 }
             } catch (UnsupportedVersionException e) {
                 transactionManager.transitionToFatalError(e);
@@ -466,7 +536,7 @@ public class Sender implements Runnable {
         int correlationId = requestHeader.correlationId();
         if (response.wasDisconnected()) {
             log.trace("Cancelled request with header {} due to node {} being disconnected",
-                    requestHeader, response.destination());
+                requestHeader, response.destination());
             for (ProducerBatch batch : batches.values())
                 completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
         } else if (response.versionMismatch() != null) {
@@ -511,23 +581,25 @@ public class Sender implements Runnable {
                 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
             // If the batch is too large, we split the batch and send the split batches again. We do not decrement
             // the retry attempts in this case.
-            log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
-                     correlationId,
-                     batch.topicPartition,
-                     this.retries - batch.attempts(),
-                     error);
+            log.warn(
+                "Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
+                correlationId,
+                batch.topicPartition,
+                this.retries - batch.attempts(),
+                error);
             if (transactionManager != null)
                 transactionManager.removeInFlightBatch(batch);
             this.accumulator.splitAndReenqueue(batch);
             this.accumulator.deallocate(batch);
             this.sensors.recordBatchSplit();
         } else if (error != Errors.NONE) {
-            if (canRetry(batch, response)) {
-                log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
-                        correlationId,
-                        batch.topicPartition,
-                        this.retries - batch.attempts() - 1,
-                        error);
+            if (canRetry(batch, response, now)) {
+                log.warn(
+                    "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
+                    correlationId,
+                    batch.topicPartition,
+                    this.retries - batch.attempts() - 1,
+                    error);
                 if (transactionManager == null) {
                     reenqueueBatch(batch, now);
                 } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
@@ -564,14 +636,14 @@ public class Sender implements Runnable {
             if (error.exception() instanceof InvalidMetadataException) {
                 if (error.exception() instanceof UnknownTopicOrPartitionException) {
                     log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
-                            "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
+                            "topic-partition may not exist or the user may not have Describe access to it",
+                        batch.topicPartition);
                 } else {
                     log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
                             "to request metadata update now", batch.topicPartition, error.exception().toString());
                 }
                 metadata.requestUpdate();
             }
-
         } else {
             completeBatch(batch, response);
         }
@@ -583,35 +655,43 @@ public class Sender implements Runnable {
 
     private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
         this.accumulator.reenqueue(batch, currentTimeMs);
+        maybeRemoveFromInflightBatches(batch);
         this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
     }
 
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
         if (transactionManager != null) {
             if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
-                transactionManager.maybeUpdateLastAckedSequence(batch.topicPartition, batch.baseSequence() + batch.recordCount - 1);
-                log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", batch.producerId(), batch.topicPartition,
-                        transactionManager.lastAckedSequence(batch.topicPartition));
+                transactionManager
+                    .maybeUpdateLastAckedSequence(batch.topicPartition, batch.baseSequence() + batch.recordCount - 1);
+                log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}",
+                    batch.producerId(),
+                    batch.topicPartition,
+                    transactionManager.lastAckedSequence(batch.topicPartition));
             }
             transactionManager.updateLastAckedOffset(response, batch);
             transactionManager.removeInFlightBatch(batch);
         }
 
-        if (batch.done(response.baseOffset, response.logAppendTime, null))
+        if (batch.done(response.baseOffset, response.logAppendTime, null)) {
+            maybeRemoveFromInflightBatches(batch);
             this.accumulator.deallocate(batch);
+        }
     }
 
-    private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, boolean adjustSequenceNumbers) {
+    private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception,
+                           boolean adjustSequenceNumbers) {
         failBatch(batch, response.baseOffset, response.logAppendTime, exception, adjustSequenceNumbers);
     }
 
-    private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) {
+    private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception,
+        boolean adjustSequenceNumbers) {
         if (transactionManager != null) {
             if (exception instanceof OutOfOrderSequenceException
                     && !transactionManager.isTransactional()
                     && transactionManager.hasProducerId(batch.producerId())) {
                 log.error("The broker returned {} for topic-partition " +
-                                "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
+                            "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
                         exception, batch.topicPartition, baseOffset);
 
                 // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
@@ -633,19 +713,23 @@ public class Sender implements Runnable {
 
         this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
 
-        if (batch.done(baseOffset, logAppendTime, exception))
+        if (batch.done(baseOffset, logAppendTime, exception)) {
+            maybeRemoveFromInflightBatches(batch);
             this.accumulator.deallocate(batch);
+        }
     }
 
     /**
      * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed.
-     * We can also retry OutOfOrderSequence exceptions for future batches, since if the first batch has failed, the future
-     * batches are certain to fail with an OutOfOrderSequence exception.
+     * We can also retry OutOfOrderSequence exceptions for future batches, since if the first batch has failed, the
+     * future batches are certain to fail with an OutOfOrderSequence exception.
      */
-    private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
-        return batch.attempts() < this.retries &&
-                ((response.error.exception() instanceof RetriableException) ||
-                        (transactionManager != null && transactionManager.canRetry(response, batch)));
+    private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
+        return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
+            batch.attempts() < this.retries &&
+            batch.finalState() == null &&
+            ((response.error.exception() instanceof RetriableException) ||
+                (transactionManager != null && transactionManager.canRetry(response, batch)));
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 1713d78..4f8420b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -65,7 +65,7 @@ public class AbstractConfig {
             this.values.put(update.getKey(), update.getValue());
         }
         definition.parse(this.values);
-        this.used = Collections.synchronizedSet(new HashSet<String>());
+        this.used = Collections.synchronizedSet(new HashSet<>());
         this.definition = definition;
         if (doLog)
             logAll();
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index c9efb82..12e467c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -57,7 +57,7 @@ import java.util.Set;
  * Map&lt;String, String&gt; props = new HashMap&lt;&gt();
  * props.put(&quot;config_with_default&quot;, &quot;some value&quot;);
  * props.put(&quot;config_with_dependents&quot;, &quot;some other value&quot;);
- * 
+ *
  * Map&lt;String, Object&gt; configs = defs.parse(props);
  * // will return &quot;some value&quot;
  * String someConfig = (String) configs.get(&quot;config_with_default&quot;);
@@ -595,10 +595,8 @@ public class ConfigDef {
         if (!configKeys.containsKey(name)) {
             return;
         }
-        
         ConfigKey key = configKeys.get(name);
         ConfigValue value = configs.get(name);
-        
         if (key.recommender != null) {
             try {
                 List<Object> recommendedValues = key.recommender.validValues(name, parsed);
@@ -845,6 +843,11 @@ public class ConfigDef {
         private final Number min;
         private final Number max;
 
+        /**
+         *  A numeric range with inclusive upper bound and inclusive lower bound
+         * @param min  the lower bound
+         * @param max  the upper bound
+         */
         private Range(Number min, Number max) {
             this.min = min;
             this.max = max;
@@ -860,7 +863,7 @@ public class ConfigDef {
         }
 
         /**
-         * A numeric range that checks both the upper and lower bound
+         * A numeric range that checks both the upper (inclusive) and lower bound
          */
         public static Range between(Number min, Number max) {
             return new Range(min, max);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6b41a9e..a586af8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -280,7 +280,6 @@ public class MockClient implements KafkaClient {
         checkTimeoutOfPendingRequests(now);
 
         List<ClientResponse> copy = new ArrayList<>(this.responses);
-
         if (metadata != null && metadata.updateRequested()) {
             MetadataUpdate metadataUpdate = metadataUpdates.poll();
             if (cluster != null)
@@ -351,7 +350,9 @@ public class MockClient implements KafkaClient {
 
 
     public void respond(AbstractResponse response, boolean disconnected) {
-        ClientRequest request = requests.remove();
+        ClientRequest request = null;
+        if (requests.size() > 0)
+            request = requests.remove();
         short version = request.requestBuilder().latestAllowedVersion();
         responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                 request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c83fe06..634a1ab 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1679,7 +1679,7 @@ public class KafkaConsumerTest {
     }
 
     private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets) {
-        return listOffsetsResponse(offsets, Collections.<TopicPartition, Errors>emptyMap());
+        return listOffsetsResponse(offsets, Collections.emptyMap());
     }
 
     private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> partitionOffsets,
@@ -1818,7 +1818,7 @@ public class KafkaConsumerTest {
                 requestTimeoutMs,
                 IsolationLevel.READ_UNCOMMITTED);
 
-        return new KafkaConsumer<String, String>(
+        return new KafkaConsumer<>(
                 loggerFactory,
                 clientId,
                 consumerCoordinator,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 2f89d79..6a85449 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -226,40 +226,30 @@ public class ProducerBatchTest {
     }
 
     /**
-     * A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create
-     * time is interpreted correctly as not expired when the linger time is larger than the difference
-     * between now and create time by {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}.
+     * A {@link ProducerBatch} configured using a timestamp preceding its create time is interpreted correctly
+     * as not expired by {@link ProducerBatch#hasReachedDeliveryTimeout(long, long)}.
      */
     @Test
-    public void testLargeLingerOldNowExpire() {
+    public void testBatchExpiration() {
+        long deliveryTimeoutMs = 10240;
         ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
         // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, 100L, now - 2L, Long.MAX_VALUE, false));
+        assertFalse(batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now - 2));
+        // Set `now` to deliveryTimeoutMs.
+        assertTrue(batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now + deliveryTimeoutMs));
     }
 
     /**
-     * A {@link ProducerBatch} configured using a very large retryBackoff value with retry = true and a timestamp
-     * preceding its create time is interpreted correctly as not expired when the retryBackoff time is larger than the
-     * difference between now and create time by {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}.
+     * A {@link ProducerBatch} configured using a timestamp preceding its create time is interpreted correctly
+     * * as not expired by {@link ProducerBatch#hasReachedDeliveryTimeout(long, long)}.
      */
     @Test
-    public void testLargeRetryBackoffOldNowExpire() {
+    public void testBatchExpirationAfterReenqueue() {
         ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
         // Set batch.retry = true
         batch.reenqueued(now);
         // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, Long.MAX_VALUE, now - 2L, 10240L, false));
-    }
-
-    /**
-     * A {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)} call with a now value before the create
-     * time of the ProducerBatch is correctly recognized as not expired when invoked with parameter isFull = true.
-     */
-    @Test
-    public void testLargeFullOldNowExpire() {
-        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
-        // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true));
+        assertFalse(batch.hasReachedDeliveryTimeout(10240, now - 2L));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 5f48410..13b0d1b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -83,10 +83,9 @@ public class RecordAccumulatorTest {
     private MockTime time = new MockTime();
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
-    private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length, value.length,
-            Record.EMPTY_HEADERS);
+    private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length, value.length, Record.EMPTY_HEADERS);
     private Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3),
-            Collections.<String>emptySet(), Collections.<String>emptySet());
+        Collections.emptySet(), Collections.emptySet());
     private Metrics metrics = new Metrics(time);
     private final long maxBlockTimeMs = 1000;
     private final LogContext logContext = new LogContext();
@@ -255,7 +254,7 @@ public class RecordAccumulatorTest {
         final int msgs = 10000;
         final int numParts = 2;
         final RecordAccumulator accum = createTestRecordAccumulator(
-                1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L);
+            1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L);
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -299,8 +298,8 @@ public class RecordAccumulatorTest {
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
 
-        RecordAccumulator accum = createTestRecordAccumulator(
-                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs);
+        RecordAccumulator accum = createTestRecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
+                10 * batchSize, CompressionType.NONE, lingerMs);
         // Just short of going over the limit so we trigger linger time
         int appends = expectedNumAppends(batchSize);
 
@@ -332,10 +331,17 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testRetryBackoff() throws Exception {
-        long lingerMs = Long.MAX_VALUE / 4;
-        long retryBackoffMs = Long.MAX_VALUE / 2;
-        final RecordAccumulator accum = new RecordAccumulator(logContext, 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
+        long lingerMs = Integer.MAX_VALUE / 16;
+        long retryBackoffMs = Integer.MAX_VALUE / 8;
+        int requestTimeoutMs = Integer.MAX_VALUE / 4;
+        long deliveryTimeoutMs = Integer.MAX_VALUE;
+        long totalSize = 10 * 1024;
+        int batchSize = 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        String metricGrpName = "producer-metrics";
+
+        final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
+            CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
+            new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
 
         long now = time.milliseconds();
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
@@ -371,7 +377,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testFlush() throws Exception {
-        long lingerMs = Long.MAX_VALUE;
+        long lingerMs = Integer.MAX_VALUE;
         final RecordAccumulator accum = createTestRecordAccumulator(
                 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
 
@@ -413,7 +419,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testAwaitFlushComplete() throws Exception {
         RecordAccumulator accum = createTestRecordAccumulator(
-                4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE);
+            4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE);
         accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
 
         accum.beginFlush();
@@ -429,12 +435,12 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testAbortIncompleteBatches() throws Exception {
-        long lingerMs = Long.MAX_VALUE;
+        int lingerMs = Integer.MAX_VALUE;
         int numRecords = 100;
 
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
         final RecordAccumulator accum = createTestRecordAccumulator(
-                128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
+            128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -468,7 +474,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testAbortUnsentBatches() throws Exception {
-        long lingerMs = Long.MAX_VALUE;
+        int lingerMs = Integer.MAX_VALUE;
         int numRecords = 100;
 
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
@@ -509,17 +515,65 @@ public class RecordAccumulatorTest {
         assertTrue(accum.hasIncomplete());
     }
 
+    private void doExpireBatchSingle(long deliveryTimeoutMs) throws InterruptedException {
+        long lingerMs = 300L;
+        List<Boolean> muteStates = Arrays.asList(false, true);
+        Set<Node> readyNodes = null;
+        List<ProducerBatch> expiredBatches = new ArrayList<>();
+        // test case assumes that the records do not fill the batch completely
+        int batchSize = 1025;
+        RecordAccumulator accum = createTestRecordAccumulator(deliveryTimeoutMs,
+            batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs);
+
+        // Make the batches ready due to linger. These batches are not in retry
+        for (Boolean mute: muteStates) {
+            if (time.milliseconds() < System.currentTimeMillis())
+                time.setCurrentTimeMs(System.currentTimeMillis());
+            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
+            assertEquals("No partition should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
+
+            time.sleep(lingerMs);
+            readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+            assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
+
+            expiredBatches = accum.expiredBatches(time.milliseconds());
+            assertEquals("The batch should not expire when just linger has passed", 0, expiredBatches.size());
+
+            if (mute)
+                accum.mutePartition(tp1);
+            else
+                accum.unmutePartition(tp1, 0L);
+
+            // Advance the clock to expire the batch.
+            time.sleep(deliveryTimeoutMs - lingerMs);
+            expiredBatches = accum.expiredBatches(time.milliseconds());
+            assertEquals("The batch may expire when the partition is muted", 1, expiredBatches.size());
+            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
+        }
+    }
+
+    @Test
+    public void testExpiredBatchSingle() throws InterruptedException {
+        doExpireBatchSingle(3200L);
+    }
+
+    @Test
+    public void testExpiredBatchSingleMaxValue() throws InterruptedException {
+        doExpireBatchSingle(Long.MAX_VALUE);
+    }
+
     @Test
     public void testExpiredBatches() throws InterruptedException {
         long retryBackoffMs = 100L;
-        long lingerMs = 3000L;
+        long lingerMs = 30L;
         int requestTimeout = 60;
+        long deliveryTimeoutMs = 3200L;
 
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
 
         RecordAccumulator accum = createTestRecordAccumulator(
-                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs);
+            deliveryTimeoutMs, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs);
         int appends = expectedNumAppends(batchSize);
 
         // Test batches not in retry
@@ -532,14 +586,14 @@ public class RecordAccumulatorTest {
         Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
         // Advance the clock to expire the batch.
-        time.sleep(requestTimeout + 1);
+        time.sleep(deliveryTimeoutMs + 1);
         accum.mutePartition(tp1);
-        List<ProducerBatch> expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
-        assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
+        List<ProducerBatch> expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("The batches will be muted no matter if the partition is muted or not", 2, expiredBatches.size());
 
         accum.unmutePartition(tp1, 0L);
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
-        assertEquals("The batch should be expired", 1, expiredBatches.size());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("All batches should have been expired earlier", 0, expiredBatches.size());
         assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
 
         // Advance the clock to make the next batch ready due to linger.ms
@@ -548,12 +602,12 @@ public class RecordAccumulatorTest {
         time.sleep(requestTimeout + 1);
 
         accum.mutePartition(tp1);
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size());
 
         accum.unmutePartition(tp1, 0L);
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
-        assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("All batches should have been expired", 0, expiredBatches.size());
         assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
 
         // Test batches in retry.
@@ -569,17 +623,17 @@ public class RecordAccumulatorTest {
 
         // test expiration.
         time.sleep(requestTimeout + retryBackoffMs);
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired.", 0, expiredBatches.size());
         time.sleep(1L);
 
         accum.mutePartition(tp1);
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
 
         accum.unmutePartition(tp1, 0L);
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
-        assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("All batches should have been expired.", 0, expiredBatches.size());
 
         // Test that when being throttled muted batches are expired before the throttle time is over.
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
@@ -589,20 +643,20 @@ public class RecordAccumulatorTest {
         // Advance the clock to expire the batch.
         time.sleep(requestTimeout + 1);
         accum.mutePartition(tp1);
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
 
         long throttleTimeMs = 100L;
         accum.unmutePartition(tp1, time.milliseconds() + throttleTimeMs);
         // The batch shouldn't be expired yet.
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
 
         // Once the throttle time is over, the batch can be expired.
         time.sleep(throttleTimeMs);
-        expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds());
-        assertEquals("The batch should be expired", 1, expiredBatches.size());
-        assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("All batches should have been expired earlier", 0, expiredBatches.size());
+        assertEquals("No partitions should be ready.", 1, accum.ready(cluster, time.milliseconds()).readyNodes.size());
     }
 
     @Test
@@ -646,10 +700,18 @@ public class RecordAccumulatorTest {
         // Simulate talking to an older broker, ie. one which supports a lower magic.
         ApiVersions apiVersions = new ApiVersions();
         int batchSize = 1025;
+        int requestTimeoutMs = 1600;
+        long deliveryTimeoutMs = 3200L;
+        long lingerMs = 10L;
+        long retryBackoffMs = 100L;
+        long totalSize = 10 * batchSize;
+        String metricGrpName = "producer-metrics";
+
         apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id,
                 (short) 0, (short) 2))));
-        RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionManager());
+        RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
+            CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(),
+            new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
     }
 
@@ -727,9 +789,9 @@ public class RecordAccumulatorTest {
             assertFalse(drained.get(node1.id()).isEmpty());
         }
         assertTrue("All the batches should have been drained.",
-                   accum.ready(cluster, time.milliseconds()).readyNodes.isEmpty());
+                accum.ready(cluster, time.milliseconds()).readyNodes.isEmpty());
         assertEquals("The split batches should be allocated off the accumulator",
-                     bufferCapacity, accum.bufferPoolAvailableMemory());
+                bufferCapacity, accum.bufferPoolAvailableMemory());
     }
 
     @Test
@@ -760,8 +822,78 @@ public class RecordAccumulatorTest {
             numSplit += result.numSplit;
             numBatches += result.numBatches;
             assertTrue(String.format("Total num batches = %d, split batches = %d, more than 10%% of the batch splits. "
-                                         + "Random seed is " + seed,
-                                     numBatches, numSplit), (double) numSplit / numBatches < 0.1f);
+                    + "Random seed is " + seed,
+                numBatches, numSplit), (double) numSplit / numBatches < 0.1f);
+        }
+    }
+
+    @Test
+    public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedException {
+        long lingerMs = 500L;
+        int batchSize = 1025;
+
+        RecordAccumulator accum = createTestRecordAccumulator(
+            batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs);
+
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
+        Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertTrue(drained.isEmpty());
+        //assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
+
+        // advanced clock and send one batch out but it should not be included in soon to expire inflight
+        // batches because batch's expiry is quite far.
+        time.sleep(lingerMs + 1);
+        readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertEquals("A batch did not drain after linger", 1, drained.size());
+        //assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
+
+        // Queue another batch and advance clock such that batch expiry time is earlier than request timeout.
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
+        time.sleep(lingerMs * 4);
+
+        // Now drain and check that accumulator picked up the drained batch because its expiry is soon.
+        readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertEquals("A batch did not drain after linger", 1, drained.size());
+    }
+
+    @Test
+    public void testExpiredBatchesRetry() throws InterruptedException {
+        int lingerMs = 3000;
+        int rtt = 1000;
+        int deliveryTimeoutMs = 3200;
+        Set<Node> readyNodes;
+        List<ProducerBatch> expiredBatches;
+        List<Boolean> muteStates = Arrays.asList(false, true);
+
+        // test case assumes that the records do not fill the batch completely
+        int batchSize = 1025;
+        RecordAccumulator accum = createTestRecordAccumulator(
+            batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs);
+
+        // Test batches in retry.
+        for (Boolean mute: muteStates) {
+            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
+            time.sleep(lingerMs);
+            readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+            assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
+            Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
+            assertEquals("There should be only one batch.", 1, drained.get(node1.id()).size());
+            time.sleep(rtt);
+            accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
+
+            if (mute)
+                accum.mutePartition(tp1);
+            else
+                accum.unmutePartition(tp1, 0L);
+
+            // test expiration
+            time.sleep(deliveryTimeoutMs - rtt);
+            accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds());
+            expiredBatches = accum.expiredBatches(time.milliseconds());
+            assertEquals("RecordAccumulator has expired batches if the partition is not muted", mute  ? 1 : 0, expiredBatches.size());
         }
     }
 
@@ -852,7 +984,7 @@ public class RecordAccumulatorTest {
         int offsetDelta = 0;
         while (true) {
             int recordSize = DefaultRecord.sizeInBytes(offsetDelta, 0, key.length, value.length,
-                    Record.EMPTY_HEADERS);
+                Record.EMPTY_HEADERS);
             if (size + recordSize > batchSize)
                 return offsetDelta;
             offsetDelta += 1;
@@ -860,20 +992,32 @@ public class RecordAccumulatorTest {
         }
     }
 
+
+    private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) {
+        long deliveryTimeoutMs = 3200L;
+        return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
+    }
+
     /**
      * Return a test RecordAccumulator instance
      */
-    private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) {
+    private RecordAccumulator createTestRecordAccumulator(long deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, long lingerMs) {
+        long retryBackoffMs = 100L;
+        int requestTimeoutMs = 1600;
+        String metricGrpName = "producer-metrics";
+
         return new RecordAccumulator(
-                logContext,
-                batchSize,
-                totalSize,
-                type,
-                lingerMs,
-                100L,
-                metrics,
-                time,
-                new ApiVersions(),
-                null);
+            logContext,
+            batchSize,
+            type,
+            lingerMs,
+            retryBackoffMs,
+            deliveryTimeoutMs,
+            metrics,
+            metricGrpName,
+            time,
+            new ApiVersions(),
+            null,
+            new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index d87c8f9..2fbe3df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -16,6 +16,21 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
@@ -62,6 +77,7 @@ import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
@@ -69,25 +85,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class SenderTest {
@@ -131,10 +132,12 @@ public class SenderTest {
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
         assertTrue(client.hasInFlightRequests());
         client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("All requests completed.", 0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         assertFalse(client.hasInFlightRequests());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
@@ -328,33 +331,42 @@ public class SenderTest {
             Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
+            assertEquals(1, sender.inFlightBatches(tp0).size());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
             client.disconnect(id);
             assertEquals(0, client.inFlightRequestCount());
             assertFalse(client.hasInFlightRequests());
             assertFalse("Client ready status should be false", client.isReady(node, 0L));
+            // the batch is in accumulator.inFlightBatches until it expires
+            assertEquals(1, sender.inFlightBatches(tp0).size());
             sender.run(time.milliseconds()); // receive error
             sender.run(time.milliseconds()); // reconnect
             sender.run(time.milliseconds()); // resend
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
+            assertEquals(1, sender.inFlightBatches(tp0).size());
             long offset = 0;
             client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
             sender.run(time.milliseconds());
             assertTrue("Request should have retried and completed", future.isDone());
             assertEquals(offset, future.get().offset());
+            assertEquals(0, sender.inFlightBatches(tp0).size());
 
             // do an unsuccessful retry
             future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // send produce request
+            assertEquals(1, sender.inFlightBatches(tp0).size());
             for (int i = 0; i < maxRetries + 1; i++) {
                 client.disconnect(client.requests().peek().destination());
                 sender.run(time.milliseconds()); // receive error
+                assertEquals(0, sender.inFlightBatches(tp0).size());
                 sender.run(time.milliseconds()); // reconnect
                 sender.run(time.milliseconds()); // resend
+                assertEquals(i > 0 ? 0 : 1, sender.inFlightBatches(tp0).size());
             }
             sender.run(time.milliseconds());
             assertFutureFailure(future, NetworkException.class);
+            assertEquals(0, sender.inFlightBatches(tp0).size());
         } finally {
             m.close();
         }
@@ -371,7 +383,7 @@ public class SenderTest {
                     senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
-            metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
+            metadata.update(cluster1, Collections.emptySet(), time.milliseconds());
 
             // Send the first message.
             TopicPartition tp2 = new TopicPartition("test", 1);
@@ -384,6 +396,7 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
+            assertEquals(1, sender.inFlightBatches(tp2).size());
 
             time.sleep(900);
             // Now send another message to tp2
@@ -391,11 +404,13 @@ public class SenderTest {
 
             // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
             Cluster cluster2 = TestUtils.singletonCluster("test", 2);
-            metadata.update(cluster2, Collections.<String>emptySet(), time.milliseconds());
+            metadata.update(cluster2, Collections.emptySet(), time.milliseconds());
             // Sender should not send the second message to node 0.
-            sender.run(time.milliseconds());
+            assertEquals(1, sender.inFlightBatches(tp2).size());
+            sender.run(time.milliseconds());  // receive the response for the previous send, and send the new batch
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
+            assertEquals(1, sender.inFlightBatches(tp2).size());
         } finally {
             m.close();
         }
@@ -429,14 +444,18 @@ public class SenderTest {
 
         // Advance the clock to expire the first batch.
         time.sleep(10000);
+
+        Node clusterNode = this.cluster.nodes().get(0);
+        Map<Integer, List<ProducerBatch>> drainedBatches =
+            accumulator.drain(cluster, Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds());
+        sender.addToInflightBatches(drainedBatches);
+
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
         sender.run(time.milliseconds());  // We should try to flush the batch, but we expire it instead without sending anything.
-
         assertEquals("Callbacks not invoked for expiry", messagesPerBatch, expiryCallbackCount.get());
         assertNull("Unexpected exception", unexpectedException.get());
         // Make sure that the reconds were appended back to the batch.
@@ -463,6 +482,7 @@ public class SenderTest {
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
 
@@ -479,6 +499,7 @@ public class SenderTest {
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
     }
@@ -520,6 +541,7 @@ public class SenderTest {
         Node node = new Node(Integer.parseInt(id), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertTrue(client.hasInFlightRequests());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
         assertTrue("Client ready status should be true", client.isReady(node, 0L));
         assertFalse(future.isDone());
 
@@ -583,6 +605,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // receive response 1
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
     }
@@ -654,11 +677,12 @@ public class SenderTest {
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
-
-
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
         sender.run(time.milliseconds()); // send request 2;
         assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
         sender.run(time.milliseconds());  // receive response 2
@@ -667,17 +691,19 @@ public class SenderTest {
         assertEquals(1, request2.get().offset());
 
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
 
         sender.run(time.milliseconds()); // send request 3
         assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L);
         sender.run(time.milliseconds());  // receive response 3, send request 4 since we are out of 'retry' mode.
         assertEquals(2, transactionManager.lastAckedSequence(tp0));
         assertTrue(request3.isDone());
         assertEquals(2, request3.get().offset());
-
         assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L);
         sender.run(time.milliseconds());  // receive response 4
@@ -795,7 +821,6 @@ public class SenderTest {
         setupWithTransactionState(transactionManager);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-
         assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
 
         // Send first ProduceRequest
@@ -965,46 +990,54 @@ public class SenderTest {
     public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        setupWithTransactionState(transactionManager);
+        setupWithTransactionState(transactionManager, false, null);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-
         assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
+        // We separate the two appends by 1 second so that the two batches
+        // don't expire at the same time.
+        time.sleep(1000L);
 
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
-
         assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1);
         sender.run(time.milliseconds());  // receive first response
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         Node node = this.cluster.nodes().get(0);
-        time.sleep(10000L);
+        // We add 600 millis to expire the first batch but not the second.
+        // Note deliveryTimeoutMs is 1500.
+        time.sleep(600L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
         sender.run(time.milliseconds()); // now expire the first batch.
         assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
         // let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
         Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-
         time.sleep(20);
-
         assertFalse(request2.isDone());
 
         sender.run(time.milliseconds());  // send second request
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1);
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+
         sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
-        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+        assertEquals(0, sender.inFlightBatches(tp0).size());
 
+        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
         assertEquals(1, batches.size());
         assertFalse(batches.peekFirst().hasSequence());
         assertFalse(client.hasInFlightRequests());
@@ -1017,6 +1050,7 @@ public class SenderTest {
         assertEquals(0, batches.size());
         assertEquals(1, client.inFlightRequestCount());
         assertFalse(request3.isDone());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
     }
 
     @Test
@@ -1026,13 +1060,13 @@ public class SenderTest {
         setupWithTransactionState(transactionManager);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-
         assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
 
+        time.sleep(1000L);
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
 
@@ -1042,7 +1076,7 @@ public class SenderTest {
         sender.run(time.milliseconds());  // receive first response
 
         Node node = this.cluster.nodes().get(0);
-        time.sleep(10000L);
+        time.sleep(1000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
@@ -1053,9 +1087,7 @@ public class SenderTest {
         Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
 
         time.sleep(20);
-
         assertFalse(request2.isDone());
-
         sender.run(time.milliseconds());  // send second request
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
         sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
@@ -1087,12 +1119,12 @@ public class SenderTest {
         Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
         sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
-        sender.run(time.milliseconds());  // receive response
 
+        sender.run(time.milliseconds());  // receive response
         assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
 
         Node node = this.cluster.nodes().get(0);
-        time.sleep(10000L);
+        time.sleep(15000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
@@ -1520,7 +1552,6 @@ public class SenderTest {
                 RecordBatch firstBatch = batchIterator.next();
                 assertFalse(batchIterator.hasNext());
                 assertEquals(expectedSequence, firstBatch.baseSequence());
-
                 return true;
             }
         }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset));
@@ -1754,11 +1785,13 @@ public class SenderTest {
         sender.run(time.milliseconds());  // send.
 
         assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
 
         sender.run(time.milliseconds());
         assertTrue(responseFuture.isDone());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
     }
 
@@ -1794,11 +1827,15 @@ public class SenderTest {
                                        TopicPartition tp) throws Exception {
         int maxRetries = 1;
         String topic = tp.topic();
+        long deliveryTimeoutMs = 3000L;
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-metrics";
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
         try (Metrics m = new Metrics()) {
-            accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
-                    new ApiVersions(), txnManager);
+            accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
+                0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
+                new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
@@ -1865,9 +1902,153 @@ public class SenderTest {
             assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp));
             assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
             assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
+            assertTrue("There should be a split", m.metrics().get(senderMetrics.batchSplitRate).value() > 0);
+        }
+    }
+
+    @Test
+    public void testNoDoubleDeallocation() throws Exception {
+        long deliverTimeoutMs = 1500L;
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-custom-metrics";
+        MatchingBufferPool pool = new MatchingBufferPool(totalSize, batchSize, metrics, time, metricGrpName);
+        setupWithTransactionState(null, false, pool);
 
-            assertTrue("There should be a split",
-                    m.metrics().get(senderMetrics.batchSplitRate).value() > 0);
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 =
+            accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+
+        time.sleep(deliverTimeoutMs);
+        assertFalse(pool.allMatch());
+
+        sender.run(time.milliseconds());  // expire the batch
+        assertTrue(request1.isDone());
+        assertTrue("The batch should have been de-allocated", pool.allMatch());
+        assertTrue(pool.allMatch());
+
+        sender.run(time.milliseconds());
+        assertTrue("The batch should have been de-allocated", pool.allMatch());
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+    }
+
+    @Test
+    public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedException {
+        long deliveryTimeoutMs = 1500L;
+        setupWithTransactionState(null, true, null);
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size());
+
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
+        responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+        client.respond(new ProduceResponse(responseMap));
+
+        time.sleep(deliveryTimeoutMs);
+        sender.run(time.milliseconds());  // receive first response
+        assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size());
+        try {
+            request.get();
+            fail("The expired batch should throw a TimeoutException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+    }
+
+    @Test
+    public void testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder() throws InterruptedException {
+        long deliveryTimeoutMs = 1500L;
+        setupWithTransactionState(null, true, null);
+
+        // Send first ProduceRequest
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+
+        time.sleep(deliveryTimeoutMs / 2);
+
+        // Send second ProduceRequest
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        sender.run(time.milliseconds());  // must not send request because the partition is muted
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+
+        time.sleep(deliveryTimeoutMs / 2); // expire the first batch only
+
+        client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L));
+        sender.run(time.milliseconds());  // receive response (offset=0)
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
+        sender.run(time.milliseconds());  // Drain the second request only this time
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+    }
+
+    @Test
+    public void testExpiredBatchDoesNotRetry() throws Exception {
+        long deliverTimeoutMs = 1500L;
+        setupWithTransactionState(null, false, null);
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 =
+            accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
+                MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        time.sleep(deliverTimeoutMs);
+
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
+        responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+        client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1)); // return a retriable error
+
+        sender.run(time.milliseconds());  // expire the batch
+        assertTrue(request1.isDone());
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
+        sender.run(time.milliseconds()); // receive first response and do not reenqueue.
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
+        sender.run(time.milliseconds()); // run again and must not send anything.
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+    }
+
+    private class MatchingBufferPool extends BufferPool {
+        IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;
+
+        MatchingBufferPool(long totalSize, int batchSize, Metrics metrics, Time time, String metricGrpName) {
+            super(totalSize, batchSize, metrics, time, metricGrpName);
+            allocatedBuffers = new IdentityHashMap<>();
+        }
+
+        @Override
+        public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
+            ByteBuffer buffer = super.allocate(size, maxTimeToBlockMs);
+            allocatedBuffers.put(buffer, Boolean.TRUE);
+            return buffer;
+        }
+
+        @Override
+        public void deallocate(ByteBuffer buffer, int size) {
+            if (!allocatedBuffers.containsKey(buffer)) {
+                throw new IllegalStateException("Deallocating a buffer that is not allocated");
+            }
+            allocatedBuffers.remove(buffer);
+            super.deallocate(buffer, size);
+        }
+
+        public boolean allMatch() {
+            return allocatedBuffers.isEmpty();
         }
     }
 
@@ -1931,17 +2112,29 @@ public class SenderTest {
     }
 
     private void setupWithTransactionState(TransactionManager transactionManager) {
+        setupWithTransactionState(transactionManager, false, null);
+    }
+
+    private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-metrics";
         Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("client-id", CLIENT_ID);
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         this.metrics = new Metrics(metricConfig, time);
-        this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
-                apiVersions, transactionManager);
-        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
+        BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
+        setupWithTransactionState(transactionManager, guaranteeOrder, metricTags, pool);
+    }
 
-        this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
-                Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
-        this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
+    private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, Map<String, String> metricTags, BufferPool pool) {
+        long deliveryTimeoutMs = 1500L;
+        String metricGrpName = "producer-metrics";
+        this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
+            deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
+        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
+        this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
+            Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+        this.metadata.update(this.cluster, Collections.emptySet(), time.milliseconds());
     }
 
     private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 558ec72..550d003 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -118,6 +118,10 @@ public class TransactionManagerTest {
         Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("client-id", CLIENT_ID);
         int batchSize = 16 * 1024;
+        int requestTimeoutMs = 1500;
+        long deliveryTimeoutMs = 3000L;
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-metrics";
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         this.brokerNode = new Node(0, "localhost", 2211);
         this.transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs,
@@ -125,7 +129,7 @@ public class TransactionManagerTest {
         Metrics metrics = new Metrics(metricConfig, time);
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);
 
-        this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
+        this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
                 MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 7291d4f..1f62103 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -139,6 +139,7 @@ public class Worker {
         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
         // User-specified overrides
         producerProps.putAll(config.originalsWithPrefix("producer."));
     }
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index dc4041f..739675e 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -68,7 +68,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     super.tearDown()
   }
 
-  protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
+  protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Int = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
     val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
       saslProperties = clientSaslProperties, retries = retries, lingerMs = lingerMs, props = props)
     registerProducer(producer)
@@ -170,13 +170,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   def testSendCompressedMessageWithCreateTime() {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps))
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
   @Test
   def testSendNonCompressedMessageWithCreateTime() {
-    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
+    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
@@ -409,7 +409,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    */
   @Test
   def testFlush() {
-    val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+    val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
     try {
       createTopic(topic, 2, 2)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
@@ -438,7 +438,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     // Test closing from caller thread.
     for (_ <- 0 until 50) {
-      val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+      val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
       val responses = (0 until numRecords) map (_ => producer.send(record0))
       assertTrue("No request is complete.", responses.forall(!_.isDone()))
       producer.close(0, TimeUnit.MILLISECONDS)
@@ -478,7 +478,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       }
     }
     for (i <- 0 until 50) {
-      val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+      val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
       try {
         // send message to partition 0
         // Only send the records in the first callback since we close the producer in the callback and no records
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 02396dd..ba4df7d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -617,9 +617,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name)
-    producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString)
+    producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Int.MaxValue.toString)
     val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
-        saslProperties = clientSaslProperties, retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
+        saslProperties = clientSaslProperties, retries = 0, lingerMs = Int.MaxValue, props = Some(producerProps))
     (0 until numRecords).foreach { i =>
       producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes))
     }
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 929dbe4..1da6f9e 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -45,7 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   def testBatchSizeZero() {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0")
-    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps))
     sendAndVerify(producer)
   }
 
@@ -53,13 +53,13 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   def testSendCompressedMessageWithLogAppendTime() {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps))
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
   @Test
   def testSendNonCompressedMessageWithLogAppendTime() {
-    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
+    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 9b77c2d..0227690 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -64,11 +64,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    producer1 = TestUtils.createProducer(brokerList, acks = 0, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+    producer1 = TestUtils.createProducer(brokerList, acks = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
-    producer2 = TestUtils.createProducer(brokerList, acks = 1, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+    producer2 = TestUtils.createProducer(brokerList, acks = 1, requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
-    producer3 = TestUtils.createProducer(brokerList, acks = -1, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+    producer3 = TestUtils.createProducer(brokerList, acks = -1, requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
   }
 
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 49553e8..61d5919 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1368,11 +1368,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] {
     private var _retries = 0
     private var _acks = -1
-    private var _requestTimeoutMs = 30000L
+    private var _requestTimeoutMs = 30000
 
     def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this }
     def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
-    def requestTimeoutMs(timeoutMs: Long): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this }
+    def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this }
 
     override def build(): KafkaProducer[String, String] = {
       val producer = TestUtils.createProducer(bootstrapServers,
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 67f33eb..57aca1e 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -204,7 +204,7 @@ class FetchRequestTest extends BaseRequestTest {
     val propsOverride = new Properties
     propsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5, lingerMs = Long.MaxValue,
+      retries = 5, lingerMs = Int.MaxValue,
       keySerializer = new StringSerializer, valueSerializer = new ByteArraySerializer, props = Some(propsOverride))
     val bytes = new Array[Byte](msgValueLen)
     val futures = try {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index cf60c78..aa902f2 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -548,8 +548,8 @@ object TestUtils extends Logging {
                            maxBlockMs: Long = 60 * 1000L,
                            bufferSize: Long = 1024L * 1024L,
                            retries: Int = 0,
-                           lingerMs: Long = 0,
-                           requestTimeoutMs: Long = 30 * 1000L,
+                           lingerMs: Int = 0,
+                           requestTimeoutMs: Int = 30 * 1000,
                            securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                            trustStoreFile: Option[File] = None,
                            saslProperties: Option[Properties] = None,
@@ -564,6 +564,11 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString)
+    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
+
+    // In case of overflow set maximum possible value for deliveryTimeoutMs
+    val deliveryTimeoutMs = if (lingerMs + requestTimeoutMs < 0) Int.MaxValue else lingerMs + requestTimeoutMs
+    producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString)
 
     /* Only use these if not already set */
     val defaultProps = Map(
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ccfab95..ac1388e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -30,6 +30,11 @@
         offset retention period (or the one set by broker) has passed since their last commit.</li>
     <li>The default for console consumer's <code>enable.auto.commit</code> property when no <code>group.id</code> is provided is now set to <code>false</code>.
         This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.</li>
+    <li>The default value for the producer's <code>retries</code> config was changed to <code>Integer.MAX_VALUE</code>, as we introduced <code>delivery.timeout.ms</code>
+        in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer">KIP-91</a>,
+        which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default,
+        the delivery timeout is set to 2 minutes.
+    </li>
 </ol>
 
 


Mime
View raw message