kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Producers should set delivery timeout instead of retries (#5425)
Date Wed, 01 Aug 2018 18:04:23 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c3e7c0b  MINOR: Producers should set delivery timeout instead of retries  (#5425)
c3e7c0b is described below

commit c3e7c0bcb258061568294c0d96b62fea94ef8ee7
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Aug 1 11:04:17 2018 -0700

    MINOR: Producers should set delivery timeout instead of retries  (#5425)
    
    Use delivery timeout instead of retries when possible and remove various TODOs associated with completion of KIP-91.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      |  2 +-
 .../kafka/clients/producer/ProducerConfig.java     | 24 ++++++++++++++--------
 .../org/apache/kafka/connect/runtime/Worker.java   |  1 -
 .../connect/storage/KafkaConfigBackingStore.java   |  2 +-
 .../connect/storage/KafkaOffsetBackingStore.java   |  2 +-
 core/src/main/scala/kafka/tools/MirrorMaker.scala  | 10 ++++-----
 .../kafka/api/AuthorizerIntegrationTest.scala      |  2 --
 .../kafka/api/BaseProducerSendTest.scala           |  6 ++++--
 .../kafka/api/EndToEndAuthorizationTest.scala      | 10 ++++-----
 .../kafka/api/IntegrationTestHarness.scala         | 20 +++++++++---------
 .../kafka/api/PlaintextConsumerTest.scala          |  2 +-
 .../kafka/api/PlaintextProducerSendTest.scala      |  3 +--
 .../kafka/api/ProducerFailureHandlingTest.scala    |  6 +++---
 .../kafka/api/RackAwareAutoTopicCreationTest.scala |  2 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |  1 -
 .../server/DynamicBrokerReconfigurationTest.scala  | 17 +++++++++------
 .../other/kafka/ReplicationQuotasTestRig.scala     |  2 +-
 .../FetchRequestDownConversionConfigTest.scala     |  2 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |  9 ++++----
 .../unit/kafka/server/LogDirFailureTest.scala      | 16 ++++++++++-----
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |  1 -
 .../scala/unit/kafka/server/ReplicaFetchTest.scala |  1 -
 .../unit/kafka/server/ReplicationQuotasTest.scala  |  8 ++++----
 .../unit/kafka/server/ServerShutdownTest.scala     |  1 -
 ...chDrivenReplicationProtocolAcceptanceTest.scala | 12 +++++------
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  7 ++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 23 ++++++++-------------
 docs/upgrade.html                                  |  4 ++++
 .../kafka/log4jappender/KafkaLog4jAppender.java    | 23 +++++++++++++++------
 .../org/apache/kafka/streams/StreamsConfig.java    | 11 +++++-----
 .../apache/kafka/streams/StreamsConfigTest.java    | 14 ++++++-------
 .../integration/QueryableStateIntegrationTest.java |  1 -
 .../apache/kafka/streams/perf/SimpleBenchmark.java |  3 ---
 .../apache/kafka/streams/perf/YahooBenchmark.java  |  2 --
 .../streams/tests/BrokerCompatibilityTest.java     |  2 --
 .../apache/kafka/streams/tests/EosTestClient.java  |  3 ---
 .../kafka/streams/tests/SmokeTestClient.java       |  4 ----
 .../kafka/streams/tests/SmokeTestDriver.java       |  5 +----
 .../kafka/tools/TransactionalMessageCopier.java    |  6 ------
 39 files changed, 133 insertions(+), 137 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b40b09a..2a35b30 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -96,7 +96,7 @@ import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.e
  * Properties props = new Properties();
  * props.put("bootstrap.servers", "localhost:9092");
  * props.put("acks", "all");
- * props.put("retries", 0);
+ * props.put("delivery.timeout.ms", 30000);
  * props.put("batch.size", 16384);
  * props.put("linger.ms", 1);
  * props.put("buffer.memory", 33554432);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index ab55353..6b37e3c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -107,10 +107,14 @@ public class ProducerConfig extends AbstractConfig {
 
     /** <code>delivery.timeout.ms</code> */
     public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms";
-    private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on the time to report success or failure after Producer.send() returns. "
-                                                          + "Producer may report failure to send a message earlier than this config if all the retries are exhausted or "
-                                                          + "a record is added to a batch nearing expiration. " + DELIVERY_TIMEOUT_MS_CONFIG + "should be equal to or "
-                                                          + "greater than " + REQUEST_TIMEOUT_MS_CONFIG + " + " + LINGER_MS_CONFIG;
+    private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on the time to report success or failure "
+            + "after a call to <code>send()</code> returns. This limits the total time that a record will be delayed "
+            + "prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed "
+            + "for retriable send failures. The producer may report failure to send a record earlier than this config if "
+            + "either an unrecoverable error is encountered, the retries have been exhausted, "
+            + "or the record is added to a batch which reached an earlier delivery expiration deadline."
+            + "The value of this config should be greater than or equal to the sum of " + REQUEST_TIMEOUT_MS_CONFIG
+            + " and " + LINGER_MS_CONFIG + ". ";
 
     /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
@@ -181,10 +185,14 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>retries</code> */
     public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
     private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
-                                              + " Note that this retry is no different than if the client resent the record upon receiving the error."
-                                              + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
-                                              + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
-                                              + " succeeds, then the records in the second batch may appear first.";
+            + " Note that this retry is no different than if the client resent the record upon receiving the error."
+            + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
+            + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
+            + " succeeds, then the records in the second batch may appear first. Note additionall that produce requests will be"
+            + " failed before the number of retries has been exhausted if the timeout configured by"
+            + " " + DELIVERY_TIMEOUT_MS_CONFIG + " expires first before successful acknowledgement. Users should generally"
+            + " prefer to leave this config unset and instead use " + DELIVERY_TIMEOUT_MS_CONFIG + " to control"
+            + " retry behavior.";
 
     /** <code>key.serializer</code> */
     public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1f62103..e2fe6b6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -135,7 +135,6 @@ public class Worker {
         // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
         // worker, but this may compromise the delivery guarantees of Kafka Connect.
         producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index ea19665..e7ee632 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -415,7 +415,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index fb8ad97..195c498 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -71,7 +71,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 9cc6ebe..d7e09e4 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, Consu
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.record.RecordBatch
@@ -50,7 +50,7 @@ import scala.util.control.ControlThrowable
  * @note For mirror maker, the following settings are set by default to make sure there is no data loss:
  *       1. use producer with following settings
  *            acks=all
- *            retries=max integer
+ *            delivery.timeout.ms=max integer
  *            max.block.ms=max long
  *            max.in.flight.requests.per.connection=1
  *       2. Consumer Settings
@@ -193,13 +193,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       val sync = producerProps.getProperty("producer.type", "async").equals("sync")
       producerProps.remove("producer.type")
       // Defaults to no data loss settings.
-      maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
+      maybeSetDefaultProperty(producerProps, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Int.MaxValue.toString)
       maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
       maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
       maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
       // Always set producer key and value serializer to ByteArraySerializer.
-      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
       producer = new MirrorMakerProducer(sync, producerProps)
 
       // Create consumers
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c6b3af2..dc9ca85 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1442,7 +1442,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val transactionalProperties = new Properties()
     transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
     val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 3,
       props = Some(transactionalProperties))
     producers += producer
     producer
@@ -1452,7 +1451,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val idempotentProperties = new Properties()
     idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
     val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 3,
       props = Some(idempotentProperties))
     producers += producer
     producer
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 739675e..ad44425 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -68,9 +68,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     super.tearDown()
   }
 
-  protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Int = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
+  protected def createProducer(brokerList: String,
+                               lingerMs: Int = 0,
+                               props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
     val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
-      saslProperties = clientSaslProperties, retries = retries, lingerMs = lingerMs, props = props)
+      saslProperties = clientSaslProperties, lingerMs = lingerMs, props = props)
     registerProducer(producer)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index a002309..7ea761f 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -189,11 +189,11 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
 
   override def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
     TestUtils.createProducer(brokerList,
-                                maxBlockMs = 3000L,
-                                securityProtocol = this.securityProtocol,
-                                trustStoreFile = this.trustStoreFile,
-                                saslProperties = this.clientSaslProperties,
-                                props = Some(producerConfig))
+      maxBlockMs = 3000L,
+      securityProtocol = this.securityProtocol,
+      trustStoreFile = this.trustStoreFile,
+      saslProperties = this.clientSaslProperties,
+      props = Some(producerConfig))
   }
 
   /**
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 053f04e..4601417 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -93,19 +93,19 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
   }
 
   def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-      TestUtils.createProducer(brokerList,
-                                  securityProtocol = this.securityProtocol,
-                                  trustStoreFile = this.trustStoreFile,
-                                  saslProperties = this.clientSaslProperties,
-                                  props = Some(producerConfig))
+    TestUtils.createProducer(brokerList,
+      securityProtocol = this.securityProtocol,
+      trustStoreFile = this.trustStoreFile,
+      saslProperties = this.clientSaslProperties,
+      props = Some(producerConfig))
   }
 
   def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
-      TestUtils.createConsumer(brokerList,
-                                  securityProtocol = this.securityProtocol,
-                                  trustStoreFile = this.trustStoreFile,
-                                  saslProperties = this.clientSaslProperties,
-                                  props = Some(consumerConfig))
+    TestUtils.createConsumer(brokerList,
+      securityProtocol = this.securityProtocol,
+      trustStoreFile = this.trustStoreFile,
+      saslProperties = this.clientSaslProperties,
+      props = Some(consumerConfig))
   }
 
   @After
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index ba4df7d..0c8c771 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -619,7 +619,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name)
     producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Int.MaxValue.toString)
     val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
-        saslProperties = clientSaslProperties, retries = 0, lingerMs = Int.MaxValue, props = Some(producerProps))
+        saslProperties = clientSaslProperties, lingerMs = Int.MaxValue, props = Some(producerProps))
     (0 until numRecords).foreach { i =>
       producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes))
     }
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 1da6f9e..8ae3952 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -70,8 +70,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
    */
   @Test
   def testAutoCreateTopic() {
-    val producer = createProducer(brokerList, retries = 5)
-
+    val producer = createProducer(brokerList)
     try {
       // Send a message to auto-create the topic
       val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 0227690..b7d3ecb 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -64,11 +64,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    producer1 = TestUtils.createProducer(brokerList, acks = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
+    producer1 = TestUtils.createProducer(brokerList, acks = 0, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
-    producer2 = TestUtils.createProducer(brokerList, acks = 1, requestTimeoutMs = 30000, maxBlockMs = 10000L,
+    producer2 = TestUtils.createProducer(brokerList, acks = 1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
-    producer3 = TestUtils.createProducer(brokerList, acks = -1, requestTimeoutMs = 30000, maxBlockMs = 10000L,
+    producer3 = TestUtils.createProducer(brokerList, acks = -1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index bfd2924..5fc626b 100644
--- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -44,7 +44,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
 
   @Test
   def testAutoCreateTopic() {
-    val producer = TestUtils.createProducer(brokerList, retries = 5)
+    val producer = TestUtils.createProducer(brokerList)
     try {
       // Send a message to auto-create the topic
       val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index b58ba74..ebc587e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -246,7 +246,6 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     val txProducer = TestUtils.createProducer(brokerList,
                                   securityProtocol = this.securityProtocol,
                                   saslProperties = this.clientSaslProperties,
-                                  retries = 1000,
                                   acks = -1,
                                   props = Some(producerConfig))
     producers += txProducer
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 61d5919..5694cff 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -304,7 +304,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
   @Test
   def testLogCleanerConfig(): Unit = {
-    val (producerThread, consumerThread) = startProduceConsume(0)
+    val (producerThread, consumerThread) = startProduceConsume(retries = 0)
 
     verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
 
@@ -437,7 +437,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   def testUncleanLeaderElectionEnable(): Unit = {
     val topic = "testtopic2"
     TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
-    val producer = ProducerBuilder().maxRetries(1000).acks(1).build()
+    val producer = ProducerBuilder().acks(1).build()
     val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
     consumer.commitSync()
@@ -543,7 +543,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     def verifyThreadPoolResize(propName: String, currentSize: => Int, threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = {
       maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix)
       val numRetries = if (mayReceiveDuplicates) 100 else 0
-      val (producerThread, consumerThread) = startProduceConsume(numRetries)
+      val (producerThread, consumerThread) = startProduceConsume(retries = numRetries)
       var threadPoolSize = currentSize
       (1 to 2).foreach { _ =>
         threadPoolSize = reducePoolSize(propName, threadPoolSize, threadPrefix)
@@ -736,6 +736,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val producer1 = ProducerBuilder().trustStoreProps(sslProperties1)
       .maxRetries(0)
       .requestTimeoutMs(1000)
+      .deliveryTimeoutMs(1000)
       .bootstrapServers(bootstrap)
       .build()
 
@@ -1366,18 +1367,21 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   }
 
   private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] {
-    private var _retries = 0
+    private var _retries = Int.MaxValue
     private var _acks = -1
     private var _requestTimeoutMs = 30000
+    private var _deliveryTimeoutMs = 30000
 
     def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this }
     def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
     def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this }
+    def deliveryTimeoutMs(timeoutMs: Int): ProducerBuilder = { _deliveryTimeoutMs= timeoutMs; this }
 
     override def build(): KafkaProducer[String, String] = {
       val producer = TestUtils.createProducer(bootstrapServers,
         acks = _acks,
         requestTimeoutMs = _requestTimeoutMs,
+        deliveryTimeoutMs = _deliveryTimeoutMs,
         retries = _retries,
         securityProtocol = _securityProtocol,
         trustStoreFile = Some(trustStoreFile1),
@@ -1417,8 +1421,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }
   }
 
-  private class ProducerThread(clientId: String, retries: Int) extends
-      ShutdownableThread(clientId, isInterruptible = false) {
+  private class ProducerThread(clientId: String, retries: Int)
+    extends ShutdownableThread(clientId, isInterruptible = false) {
+
     private val producer = ProducerBuilder().maxRetries(retries).clientId(clientId).build()
     val lastSent = new ConcurrentHashMap[Int, Int]()
     @volatile var sent = 0
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 98e568b..b2568c1 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -127,7 +127,7 @@ object ReplicationQuotasTestRig {
       createTopic(zkClient, topicName, replicas, servers)
 
       println("Writing Data")
-      val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), retries = 5, acks = 0)
+      val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), acks = 0)
       (0 until config.msgsPerPartition).foreach { x =>
         (0 until config.partitions).foreach { partition =>
           producer.send(new ProducerRecord(topicName, partition, null, new Array[Byte](config.msgSize)))
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 1e762b3..1bf6f28 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -52,7 +52,7 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
 
   private def initProducer(): Unit = {
     producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
+      keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
   }
 
   private def createTopics(numTopics: Int, numPartitions: Int,
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 57aca1e..388b0f8 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -70,7 +70,7 @@ class FetchRequestTest extends BaseRequestTest {
 
   private def initProducer(): Unit = {
     producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
+      keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
   }
 
   @Test
@@ -204,8 +204,8 @@ class FetchRequestTest extends BaseRequestTest {
     val propsOverride = new Properties
     propsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5, lingerMs = Int.MaxValue,
-      keySerializer = new StringSerializer, valueSerializer = new ByteArraySerializer, props = Some(propsOverride))
+      lingerMs = Int.MaxValue, keySerializer = new StringSerializer,
+      valueSerializer = new ByteArraySerializer, props = Some(propsOverride))
     val bytes = new Array[Byte](msgValueLen)
     val futures = try {
       (0 to 1000).map { _ =>
@@ -263,7 +263,8 @@ class FetchRequestTest extends BaseRequestTest {
     // Increase linger so that we have control over the batches created
     producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
       retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer,
-      lingerMs = 300 * 1000)
+      lingerMs = 30 * 1000,
+      deliveryTimeoutMs = 60 * 1000)
 
     val topicConfig = Map(LogConfig.MessageFormatVersionProp -> KAFKA_0_11_0_IV2.version)
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, topicConfig).head
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 2087363..9b32cd2 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -24,9 +24,8 @@ import kafka.server.LogDirFailureTest._
 import kafka.api.IntegrationTestHarness
 import kafka.controller.{OfflineReplica, PartitionAndReplica}
 import kafka.utils.{CoreUtils, Exit, TestUtils}
-
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
@@ -47,8 +46,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
   private val partitionNum = 12
 
   this.logDirCount = 3
-  this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
-  this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
   this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "60000")
   this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1")
 
@@ -58,6 +55,16 @@ class LogDirFailureTest extends IntegrationTestHarness {
     createTopic(topic, partitionNum, serverCount)
   }
 
+  override def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+    TestUtils.createProducer(brokerList,
+      retries = 0,
+      securityProtocol = this.securityProtocol,
+      trustStoreFile = this.trustStoreFile,
+      saslProperties = this.clientSaslProperties,
+      props = Some(producerConfig))
+  }
+
+
   @Test
   def testIOExceptionDuringLogRoll() {
     testProduceAfterLogDirFailureOnLeader(Roll)
@@ -175,7 +182,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
           case t: NotLeaderForPartitionException => // This may happen if ProduceRequest version <= 3
           case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${t.toString}")
         }
-      case e: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${e.toString}")
     }
 
     // Wait for producer to update metadata for the partition
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 1bd15f7..82b95a8 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -67,7 +67,6 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
       producer.close()
     producer = TestUtils.createProducer(
       TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5,
       keySerializer = new IntegerSerializer,
       valueSerializer = new StringSerializer
     )
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 0dd22f1..8eba824 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -56,7 +56,6 @@ class ReplicaFetchTest extends ZooKeeperTestHarness  {
 
     // send test messages to leader
     val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(brokers),
-                                               retries = 5,
                                                keySerializer = new StringSerializer,
                                                valueSerializer = new StringSerializer)
     val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 0bbe637..5125486 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -114,7 +114,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
       adminZkClient.changeTopicConfig(topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
 
     //Add data equally to each partition
-    producer = createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
+    producer = createProducer(getBrokerListStrFromServers(brokers), acks = 1)
     (0 until msgCount).foreach { _ =>
       (0 to 7).foreach { partition =>
         producer.send(new ProducerRecord(topic, partition, null, msg))
@@ -203,14 +203,14 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     val throttledTook = System.currentTimeMillis() - start
 
-    assertTrue((s"Throttled replication of ${throttledTook}ms should be > ${expectedDuration * 1000 * 0.9}ms"),
+    assertTrue(s"Throttled replication of ${throttledTook}ms should be > ${expectedDuration * 1000 * 0.9}ms",
       throttledTook > expectedDuration * 1000 * 0.9)
-    assertTrue((s"Throttled replication of ${throttledTook}ms should be < ${expectedDuration * 1500}ms"),
+    assertTrue(s"Throttled replication of ${throttledTook}ms should be < ${expectedDuration * 1500}ms",
       throttledTook < expectedDuration * 1000 * 1.5)
   }
 
   def addData(msgCount: Int, msg: Array[Byte]): Unit = {
-    producer = createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 0)
+    producer = createProducer(getBrokerListStrFromServers(brokers), acks = 0)
     (0 until msgCount).map(_ => producer.send(new ProducerRecord(topic, msg))).foreach(_.get)
     waitForOffsetsToMatch(msgCount, 0, 100)
   }
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 9f966b4..ff09749 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -54,7 +54,6 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     def createProducer(server: KafkaServer): KafkaProducer[Integer, String] =
       TestUtils.createProducer(
         TestUtils.getBrokerListStrFromServers(Seq(server)),
-        retries = 5,
         keySerializer = new IntegerSerializer,
         valueSerializer = new StringSerializer
       )
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 3dcf4ff..149f05f 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -305,7 +305,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers,
       CoreUtils.propsWith((KafkaConfig.MinInSyncReplicasProp, "1")))
 
-    producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
+    producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = 1)
 
     // Write one message while both brokers are up
     (0 until 1).foreach { i =>
@@ -328,7 +328,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
     //Bounce the producer (this is required, probably because the broker port changes on restart?)
     producer.close()
-    producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
+    producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = 1)
 
     //Write 3 messages
     (0 until 3).foreach { i =>
@@ -340,7 +340,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
     //Bounce the producer (this is required, probably because the broker port changes on restart?)
     producer.close()
-    producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
+    producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = 1)
 
     //Write 1 message
     (0 until 1).foreach { i =>
@@ -352,7 +352,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
     //Bounce the producer (this is required, probably because the broker port changes on restart?)
     producer.close()
-    producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
+    producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = 1)
 
     //Write 2 messages
     (0 until 2).foreach { i =>
@@ -411,7 +411,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
   }
 
   private def createBufferingProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1, lingerMs = 10000,
+    TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = -1, lingerMs = 10000,
       props = Option(CoreUtils.propsWith(
         (ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(msg.length * 1000))
         , (ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
@@ -451,7 +451,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
   }
 
   private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1)
+    TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = -1)
   }
 
   private def leader(): KafkaServer = {
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 4b5a092..a6b7732 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -104,7 +104,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     TestUtils.createTopic(zkClient, topic2, assignment2, brokers)
 
     //Send messages equally to the two partitions, then half as many to a third
-    producer = createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1)
+    producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
     (0 until 10).foreach { _ =>
       producer.send(new ProducerRecord(topic1, 0, null, "IHeartLogs".getBytes))
     }
@@ -144,7 +144,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
     def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
     TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers)
-    producer = createProducer(getBrokerListStrFromServers(brokers), retries = 10, acks = -1)
+    producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
 
     //1. Given a single message
     producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
@@ -251,7 +251,8 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
   private def sendFourMessagesToEachTopic() = {
     val testMessageList1 = List("test1", "test2", "test3", "test4")
     val testMessageList2 = List("test5", "test6", "test7", "test8")
-    val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(brokers), retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
+    val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(brokers),
+      keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
     val records =
       testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++
         testMessageList2.map(m => new ProducerRecord(topic2, m, m))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index aa902f2..9783627 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -547,9 +547,10 @@ object TestUtils extends Logging {
                            acks: Int = -1,
                            maxBlockMs: Long = 60 * 1000L,
                            bufferSize: Long = 1024L * 1024L,
-                           retries: Int = 0,
+                           retries: Int = Int.MaxValue,
+                           deliveryTimeoutMs: Int = 30 * 1000,
                            lingerMs: Int = 0,
-                           requestTimeoutMs: Int = 30 * 1000,
+                           requestTimeoutMs: Int = 20 * 1000,
                            securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                            trustStoreFile: Option[File] = None,
                            saslProperties: Option[Properties] = None,
@@ -563,13 +564,10 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
+    producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString)
     producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString)
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
 
-    // In case of overflow set maximum possible value for deliveryTimeoutMs
-    val deliveryTimeoutMs = if (lingerMs + requestTimeoutMs < 0) Int.MaxValue else lingerMs + requestTimeoutMs
-    producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString)
-
     /* Only use these if not already set */
     val defaultProps = Map(
       ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
@@ -993,7 +991,7 @@ object TestUtils extends Logging {
                       compressionType: CompressionType = CompressionType.NONE): Unit = {
     val props = new Properties()
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType.name)
-    val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers), retries = 5, acks = acks)
+    val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers), acks = acks)
     try {
       val futures = records.map(producer.send)
       futures.foreach(_.get)
@@ -1017,10 +1015,7 @@ object TestUtils extends Logging {
   }
 
   def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) {
-    val producer = createProducer(
-      TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5
-    )
+    val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers))
     producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
     producer.close()
   }
@@ -1273,19 +1268,17 @@ object TestUtils extends Logging {
                                   transactionTimeoutMs: Long = 60000) = {
     val props = new Properties()
     props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
-    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
     props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs.toString)
-    TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+    TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), acks = -1, props = Some(props))
   }
 
   // Seeds the given topic with records with keys and values in the range [0..numRecords)
   def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Unit = {
     val props = new Properties()
     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+    val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), acks = -1, props = Some(props))
     try {
       for (i <- 0 until numRecords) {
         producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, asBytes(i.toString), asBytes(i.toString)))
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ac1388e..264e26d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -35,6 +35,10 @@
         which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default,
         the delivery timeout is set to 2 minutes.
     </li>
+    <li>By default, MirrorMaker now overrides <code>delivery.timeout.ms</code> to <code>Integer.MAX_VALUE</code> when
+        configuring the producer. If you have overridden the value of <code>retries</code> in order to fail faster,
+        you will instead need to override <code>delivery.timeout.ms</code>.
+    </li>
 </ol>
 
 
diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index 6a09cab..6ddaf92 100644
--- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Future;
 
 import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
@@ -64,8 +65,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
     private String clientJaasConfPath;
     private String kerb5ConfPath;
 
-    private int retries;
-    private int requiredNumAcks = Integer.MAX_VALUE;
+    private int retries = Integer.MAX_VALUE;
+    private int requiredNumAcks = 1;
+    private int deliveryTimeoutMs = 120000;
     private boolean syncSend;
     private Producer<byte[], byte[]> producer;
     
@@ -97,6 +99,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
         this.retries = retries;
     }
 
+    public int getDeliveryTimeoutMs() {
+        return deliveryTimeoutMs;
+    }
+
+    public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
+        this.deliveryTimeoutMs = deliveryTimeoutMs;
+    }
+
     public String getCompressionType() {
         return compressionType;
     }
@@ -205,10 +215,11 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
             throw new ConfigException("Topic must be specified by the Kafka log4j appender");
         if (compressionType != null)
             props.put(COMPRESSION_TYPE_CONFIG, compressionType);
-        if (requiredNumAcks != Integer.MAX_VALUE)
-            props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
-        if (retries > 0)
-            props.put(RETRIES_CONFIG, retries);
+
+        props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
+        props.put(RETRIES_CONFIG, retries);
+        props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
+
         if (securityProtocol != null) {
             props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 54fcbc0..e9fe3c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -98,10 +98,11 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
  * StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
  * }</pre>
  *
- * When increasing both {@link ProducerConfig#RETRIES_CONFIG} and {@link ProducerConfig#MAX_BLOCK_MS_CONFIG} to be more resilient to non-available brokers you should also
- * consider increasing {@link ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG} using the following guidance:
+ *
+ * When increasing {@link ProducerConfig#MAX_BLOCK_MS_CONFIG} to be more resilient to non-available brokers you should also
+ * increase {@link ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG} using the following guidance:
  * <pre>
- *     max.poll.interval.ms > min ( max.block.ms, (retries +1) * request.timeout.ms )
+ *     max.poll.interval.ms > max.block.ms
  * </pre>
  *
  *
@@ -687,15 +688,13 @@ public class StreamsConfig extends AbstractConfig {
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
-        tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, 10);
-
         PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
     private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
-        tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
         tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
 
         PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index cdd4d09..9755334 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -456,9 +456,7 @@ public class StreamsConfigTest {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
-        String isoLevel = (String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
-        String name = READ_COMMITTED.name();
-        assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
+        assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
     @Test
@@ -466,7 +464,7 @@ public class StreamsConfigTest {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientrId");
-        assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
+        assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
 
@@ -484,7 +482,7 @@ public class StreamsConfigTest {
         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
-        assertThat((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
+        assertThat(producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
     }
 
     @Test
@@ -495,9 +493,9 @@ public class StreamsConfigTest {
         final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
 
-        assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
+        assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
         assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
-        assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
+        assertThat(producerConfigs.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), equalTo(Integer.MAX_VALUE));
         assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L));
     }
 
@@ -510,7 +508,7 @@ public class StreamsConfigTest {
 
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
 
-        assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
+        assertThat(producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index ff791be..496ba58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -1065,7 +1065,6 @@ public class QueryableStateIntegrationTest {
             final Properties producerConfig = new Properties();
             producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
             producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-            producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10);
             producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
             producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 654fd03..9301e5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -262,9 +262,6 @@ public class SimpleBenchmark {
         // improve producer throughput
         props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
-
-        //TODO remove this config or set to smaller value when KIP-91 is merged
-        props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
     }
 
     private Properties setProduceConsumeProperties(final String clientId) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index ae53870..2104221 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -177,8 +177,6 @@ public class YahooBenchmark {
 
         final CountDownLatch latch = new CountDownLatch(1);
         parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
-        //TODO remove this config or set to smaller value when KIP-91 is merged
-        parent.props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
 
         final KafkaStreams streams = createYahooBenchmarkStreams(parent.props, campaignsTopic, eventsTopic, latch, parent.numRecords);
         parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 3c8446c..767c9f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -80,8 +80,6 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
         streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), timeout);
         streamsProperties.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout + 1);
-        //TODO remove this config or set to smaller value when KIP-91 is merged
-        streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
         Serde<String> stringSerde = Serdes.String();
 
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 2d39d53..6069298 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
@@ -119,8 +118,6 @@ public class EosTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
-        //TODO remove this config or set to smaller value when KIP-91 is merged
-        props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, Integer> data = builder.stream("data");
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 268dd2f..79dfb30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -105,12 +105,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
         fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
         fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
 
-        //TODO remove this config or set to smaller value when KIP-91 is merged
-        fullProps.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 80000);
-
         fullProps.putAll(props);
         return fullProps;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 7533fdd..a504333 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -153,10 +153,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        // the next 2 config values make sure that all records are produced with no loss and no duplicates
-        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 80000);
 
         final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
@@ -166,7 +163,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         final ValueList[] data = new ValueList[numKeys];
         for (int i = 0; i < numKeys; i++) {
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
-            allData.put(data[i].key, new HashSet<Integer>());
+            allData.put(data[i].key, new HashSet<>());
         }
         final Random rand = new Random();
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 87d27e8..0d74645 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -155,12 +155,6 @@ public class TransactionalMessageCopier {
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512");
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
 
-        // Multiple inflights means that when there are rolling bounces and other cluster instability, there is an
-        // increased likelihood of having previously tried batch expire in the accumulator. This is a fatal error
-        // for a transaction, causing the copier to exit. To work around this, we bump the request timeout.
-        // We can get rid of this when KIP-91 is merged.
-        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
-
         return new KafkaProducer<>(props);
     }
 


Mime
View raw message