kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/3] kafka git commit: KAFKA-5494; Enable idempotence with max.in.flight.requests.per.connection > 1
Date Thu, 14 Sep 2017 23:11:42 GMT
KAFKA-5494; Enable idempotence with max.in.flight.requests.per.connection > 1

Here we introduce client and broker changes to support multiple inflight requests while still guaranteeing idempotence. Two major problems to be solved:

1. Sequence number management on the client when there are request failures. When a batch fails,  future inflight batches will also fail with `OutOfOrderSequenceException`. This must be handled on the client with intelligent sequence reassignment. We must also deal with the fatal failure of some batch: the future batches must get different sequence numbers when the come back.
2. On the broker, when we have multiple inflights, we can get duplicates of multiple old batches. With this patch, we retain the record metadata for 5 older batches.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3743 from apurvam/KAFKA-5494-increase-max-in-flight-for-idempotent-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d242225
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d242225
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d242225

Branch: refs/heads/trunk
Commit: 5d2422258cb975a137a42a4e08f03573c49a387e
Parents: 8a5e866
Author: Apurva Mehta <apurva@confluent.io>
Authored: Thu Sep 14 16:10:14 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Sep 14 16:10:19 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  18 +-
 .../producer/internals/ProducerBatch.java       |  36 +-
 .../producer/internals/RecordAccumulator.java   | 108 ++-
 .../clients/producer/internals/Sender.java      |  78 ++-
 .../producer/internals/TransactionManager.java  | 182 ++++-
 .../errors/DuplicateSequenceException.java      |  24 +
 .../DuplicateSequenceNumberException.java       |  24 -
 .../apache/kafka/common/protocol/Errors.java    |   4 +-
 .../common/record/MemoryRecordsBuilder.java     |  14 +
 .../org/apache/kafka/clients/MockClient.java    |  10 +
 .../clients/producer/internals/SenderTest.java  | 682 ++++++++++++++++++-
 .../internals/TransactionManagerTest.java       |  88 ++-
 core/src/main/scala/kafka/log/Log.scala         |  19 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |   6 +-
 .../scala/kafka/log/ProducerStateManager.scala  | 197 +++---
 .../scala/kafka/tools/DumpLogSegments.scala     |  12 +-
 .../kafka/api/ProducerBounceTest.scala          |   2 +-
 .../kafka/api/TransactionsBounceTest.scala      |  22 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   3 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  97 ++-
 .../kafka/log/ProducerStateManagerTest.scala    |  42 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   5 +-
 .../kafka/tools/TransactionalMessageCopier.java |   6 +-
 23 files changed, 1384 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 18248bb..b630d61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -365,7 +365,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             this.transactionManager = configureTransactionState(config, logContext, log);
             int retries = configureRetries(config, transactionManager != null, log);
-            int maxInflightRequests = configureInflightRequests(config, transactionManager != null, log);
+            int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
             short acks = configureAcks(config, transactionManager != null, log);
 
             this.apiVersions = new ApiVersions();
@@ -481,18 +481,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         return config.getInt(ProducerConfig.RETRIES_CONFIG);
     }
 
-    private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
-        boolean userConfiguredInflights = false;
-        if (config.originals().containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
-            userConfiguredInflights = true;
-        }
-        if (idempotenceEnabled && !userConfiguredInflights) {
-            log.info("Overriding the default {} to 1 since idempontence is enabled.", ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-            return 1;
-        }
-        if (idempotenceEnabled && config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) != 1) {
-            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 in order" +
-                    "to use the idempotent producer. Otherwise we cannot guarantee idempotence.");
+    private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
+        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
+                    " to use the idempotent producer.");
         }
         return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index ee7d21a..93c843b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -75,6 +75,7 @@ public final class ProducerBatch {
     private long drainedMs;
     private String expiryErrorMessage;
     private boolean retry;
+    private boolean reopened = false;
 
     public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
         this(tp, recordsBuilder, now, false);
@@ -249,6 +250,15 @@ public final class ProducerBatch {
 
         produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, new RecordBatchTooLargeException());
         produceFuture.done();
+
+        if (hasSequence()) {
+            int sequence = baseSequence();
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId(), producerEpoch());
+            for (ProducerBatch newBatch : batches) {
+                newBatch.setProducerState(producerIdAndEpoch, sequence, isTransactional());
+                sequence += newBatch.recordCount;
+            }
+        }
         return batches;
     }
 
@@ -375,8 +385,12 @@ public final class ProducerBatch {
     }
 
     public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
-        recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
-                baseSequence, isTransactional);
+        recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
+    }
+
+    public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
+        reopened = true;
+        recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
     }
 
     /**
@@ -394,6 +408,7 @@ public final class ProducerBatch {
                                                        recordsBuilder.compressionType(),
                                                        (float) recordsBuilder.compressionRatio());
         }
+        reopened = false;
     }
 
     /**
@@ -434,4 +449,21 @@ public final class ProducerBatch {
     public short producerEpoch() {
         return recordsBuilder.producerEpoch();
     }
+
+    public int baseSequence() {
+        return recordsBuilder.baseSequence();
+    }
+
+    public boolean hasSequence() {
+        return baseSequence() != RecordBatch.NO_SEQUENCE;
+    }
+
+    public boolean isTransactional() {
+        return recordsBuilder.isTransactional();
+    }
+
+    public boolean sequenceHasBeenReset() {
+        return reopened;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 72d3b29..eb162be 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -252,7 +252,8 @@ public final class RecordAccumulator {
      *  and memory records built) in one of the following cases (whichever comes first): right before send,
      *  if it is expired, or when the producer is closed.
      */
-    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
+    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
+                                         Callback callback, Deque<ProducerBatch> deque) {
         ProducerBatch last = deque.peekLast();
         if (last != null) {
             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
@@ -260,7 +261,6 @@ public final class RecordAccumulator {
                 last.closeForRecordAppends();
             else
                 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
-
         }
         return null;
     }
@@ -310,7 +310,10 @@ public final class RecordAccumulator {
         batch.reenqueued(now);
         Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
         synchronized (deque) {
-            deque.addFirst(batch);
+            if (transactionManager != null)
+                insertInSequenceOrder(deque, batch);
+            else
+                deque.addFirst(batch);
         }
     }
 
@@ -332,12 +335,71 @@ public final class RecordAccumulator {
             incomplete.add(batch);
             // We treat the newly split batches as if they are not even tried.
             synchronized (partitionDequeue) {
-                partitionDequeue.addFirst(batch);
+                if (transactionManager != null) {
+                    // We should track the newly created batches since they already have assigned sequences.
+                    transactionManager.addInFlightBatch(batch);
+                    insertInSequenceOrder(partitionDequeue, batch);
+                } else {
+                    partitionDequeue.addFirst(batch);
+                }
             }
         }
         return numSplitBatches;
     }
 
+    // The deque for the partition may have to be reordered in situations where leadership changes in between
+    // batch drains. Since the requests are on different connections, we no longer have any guarantees about ordering
+    // of the responses. Hence we will have to check if there is anything out of order and ensure the batch is queued
+    // in the correct sequence order.
+    //
+    // Note that this assumes that all the batches in the queue which have an assigned sequence also have the current
+    // producer id. We will not attempt to reorder messages if the producer id has changed.
+
+    private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) {
+        // When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence.
+        if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
+            throw new IllegalStateException("Trying to reenqueue a batch which doesn't have a sequence even " +
+                    "though idempotence is enabled.");
+
+        if (transactionManager.nextBatchBySequence(batch.topicPartition) == null)
+            throw new IllegalStateException("We are reenqueueing a batch which is not tracked as part of the in flight " +
+                    "requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence());
+
+        // If there are no inflight batches being tracked by the transaction manager, it means that the producer
+        // id must have changed and the batches being re enqueued are from the old producer id. In this case
+        // we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence,
+        // or they will succeed.
+        if (batch.baseSequence() != transactionManager.nextBatchBySequence(batch.topicPartition).baseSequence()) {
+            // The incoming batch can't be inserted at the front of the queue without violating the sequence ordering.
+            // This means that the incoming batch should be placed somewhere further back.
+            // We need to find the right place for the incoming batch and insert it there.
+            // We will only enter this branch if we have multiple inflights sent to different brokers, perhaps
+            // because a leadership change occurred in between the drains. In this scenario, responses can come
+            // back out of order, requiring us to re order the batches ourselves rather than relying on the
+            // implicit ordering guarantees of the network client which are only on a per connection basis.
+
+            List<ProducerBatch> orderedBatches = new ArrayList<>();
+            while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence())
+                orderedBatches.add(deque.pollFirst());
+
+            log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " +
+                    "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size());
+            // Either we have reached a point where there are batches without a sequence (ie. never been drained
+            // and are hence in order by default), or the batch at the front of the queue has a sequence greater
+            // than the incoming batch. This is the right place to add the incoming batch.
+            deque.addFirst(batch);
+
+            // Now we have to re insert the previously queued batches in the right order.
+            for (int i = orderedBatches.size() - 1; i >= 0; --i) {
+                deque.addFirst(orderedBatches.get(i));
+            }
+
+            // At this point, the incoming batch has been queued in the correct place according to its sequence.
+        } else {
+            deque.addFirst(batch);
+        }
+    }
+
     /**
      * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
      * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
@@ -470,20 +532,42 @@ public final class RecordAccumulator {
                                                 break;
 
                                             isTransactional = transactionManager.isTransactional();
+
+                                            if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))
+                                                // Don't drain any new batches while the state of previous sequence numbers
+                                                // is unknown. The previous batches would be unknown if they were aborted
+                                                // on the client after being sent to the broker at least once.
+                                                break;
+
+                                            if (first.hasSequence()
+                                                    && first.baseSequence() != transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
+                                                // If the queued batch already has an assigned sequence, then it is being
+                                                // retried. In this case, we wait until the next immediate batch is ready
+                                                // and drain that. We only move on when the next in line batch is complete (either successfully
+                                                // or due to a fatal broker error). This effectively reduces our
+                                                // in flight request count to 1.
+                                                break;
                                         }
 
                                         ProducerBatch batch = deque.pollFirst();
-                                        if (producerIdAndEpoch != null && !batch.inRetry()) {
-                                            // If the batch is in retry, then we should not change the producer id and
+                                        if (producerIdAndEpoch != null && !batch.hasSequence()) {
+                                            // If the batch already has an assigned sequence, then we should not change the producer id and
                                             // sequence number, since this may introduce duplicates. In particular,
                                             // the previous attempt may actually have been accepted, and if we change
                                             // the producer id and sequence here, this attempt will also be accepted,
                                             // causing a duplicate.
-                                            int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
-                                            log.debug("Assigning sequence number {} from producer {} to dequeued " +
-                                                            "batch from partition {} bound for {}.",
-                                                    sequenceNumber, producerIdAndEpoch, batch.topicPartition, node);
-                                            batch.setProducerState(producerIdAndEpoch, sequenceNumber, isTransactional);
+                                            //
+                                            // Additionally, we update the next sequence number bound for the partition,
+                                            // and also have the transaction manager track the batch so as to ensure
+                                            // that sequence ordering is maintained even if we receive out of order
+                                            // responses.
+                                            batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+                                            transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
+                                            log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
+                                                            "{} being sent to partition {}", producerIdAndEpoch.producerId,
+                                                    producerIdAndEpoch.epoch, batch.baseSequence(), tp);
+
+                                            transactionManager.addInFlightBatch(batch);
                                         }
                                         batch.close();
                                         size += batch.records().sizeInBytes();
@@ -635,7 +719,7 @@ public final class RecordAccumulator {
             Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
             boolean aborted = false;
             synchronized (dq) {
-                if (!batch.isClosed()) {
+                if ((transactionManager != null && !batch.hasSequence()) || (transactionManager == null && !batch.isClosed())) {
                     aborted = true;
                     batch.abortRecordAppends();
                     dq.remove(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 466bdd5..ecfad70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -200,10 +200,17 @@ public class Sender implements Runnable {
      */
     void run(long now) {
         if (transactionManager != null) {
+            if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
+                // Check if the previous run expired batches which requires a reset of the producer state.
+                transactionManager.resetProducerId();
+
             if (!transactionManager.isTransactional()) {
                 // this is an idempotent producer, so make sure we have a producer id
                 maybeWaitForProducerId();
-            } else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {
+            } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
+                transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
+                        "some previously sent messages and can no longer retry them. It isn't safe to continue."));
+            } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                 // as long as there are outstanding transactional requests, we simply wait for them to return
                 client.poll(retryBackoffMs, now);
                 return;
@@ -228,6 +235,7 @@ public class Sender implements Runnable {
 
     private long sendProducerData(long now) {
         Cluster cluster = metadata.fetch();
+
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
@@ -264,23 +272,17 @@ public class Sender implements Runnable {
         }
 
         List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
-        boolean needsTransactionStateReset = false;
         // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
         // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
         // we need to reset the producer id here.
         if (!expiredBatches.isEmpty())
             log.trace("Expired {} batches in accumulator", expiredBatches.size());
         for (ProducerBatch expiredBatch : expiredBatches) {
-            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
+            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
             if (transactionManager != null && expiredBatch.inRetry()) {
-                needsTransactionStateReset = true;
+                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
+                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
             }
-            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
-        }
-
-        if (needsTransactionStateReset) {
-            transactionManager.resetProducerId();
-            return 0;
         }
 
         sensors.updateProduceRequestMetrics(batches);
@@ -345,7 +347,7 @@ public class Sender implements Runnable {
 
                     ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
                             requestBuilder, now, true, nextRequestHandler);
-                    transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
+                    transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId());
                     log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
 
                     client.send(clientRequest, now);
@@ -422,6 +424,7 @@ public class Sender implements Runnable {
                         ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
                                 initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
                         transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
+                        return;
                     } else if (error.exception() instanceof RetriableException) {
                         log.debug("Retriable error from InitProducerId response", error.message());
                     } else {
@@ -492,6 +495,7 @@ public class Sender implements Runnable {
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                long now) {
         Errors error = response.error;
+
         if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
                 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
             // If the batch is too large, we split the batch and send the split batches again. We do not decrement
@@ -501,6 +505,8 @@ public class Sender implements Runnable {
                      batch.topicPartition,
                      this.retries - batch.attempts(),
                      error);
+            if (transactionManager != null)
+                transactionManager.removeInFlightBatch(batch);
             this.accumulator.splitAndReenqueue(batch);
             this.accumulator.deallocate(batch);
             this.sensors.recordBatchSplit();
@@ -517,14 +523,20 @@ public class Sender implements Runnable {
                     // If idempotence is enabled only retry the request if the current producer id is the same as
                     // the producer id of the batch.
                     log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
-                            transactionManager.sequenceNumber(batch.topicPartition));
+                            batch.baseSequence());
                     reenqueueBatch(batch, now);
                 } else {
                     failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
                             "batch but the producer id changed from " + batch.producerId() + " to " +
-                            transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."));
-                    this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+                            transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
                 }
+            } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
+                // If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
+                // the sequence of the current batch, and we haven't retained batch metadata on the broker to return
+                // the correct offset and timestamp.
+                //
+                // The only thing we can do is to return success to the user and not return a valid offset and timestamp.
+                completeBatch(batch, response);
             } else {
                 final RuntimeException exception;
                 if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
@@ -533,9 +545,10 @@ public class Sender implements Runnable {
                     exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
                 else
                     exception = error.exception();
-                // tell the user the result of their request
-                failBatch(batch, response, exception);
-                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+                // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
+                // its retries -- if it did, we don't know whether the sequence number was accepted or not, and
+                // thus it is not safe to reassign the sequence.
+                failBatch(batch, response, exception, batch.attempts() < this.retries);
             }
             if (error.exception() instanceof InvalidMetadataException) {
                 if (error.exception() instanceof UnknownTopicOrPartitionException)
@@ -559,21 +572,24 @@ public class Sender implements Runnable {
     }
 
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
-        if (transactionManager != null && transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
-            transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
-            log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
-                    transactionManager.sequenceNumber(batch.topicPartition));
+        if (transactionManager != null) {
+            if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
+                transactionManager.maybeUpdateLastAckedSequence(batch.topicPartition, batch.baseSequence() + batch.recordCount - 1);
+                log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", batch.producerId(), batch.topicPartition,
+                        transactionManager.lastAckedSequence(batch.topicPartition));
+            }
+            transactionManager.removeInFlightBatch(batch);
         }
 
         batch.done(response.baseOffset, response.logAppendTime, null);
         this.accumulator.deallocate(batch);
     }
 
-    private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
-        failBatch(batch, response.baseOffset, response.logAppendTime, exception);
+    private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, boolean adjustSequenceNumbers) {
+        failBatch(batch, response.baseOffset, response.logAppendTime, exception, adjustSequenceNumbers);
     }
 
-    private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception) {
+    private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) {
         if (transactionManager != null) {
             if (exception instanceof OutOfOrderSequenceException
                     && !transactionManager.isTransactional()
@@ -594,16 +610,26 @@ public class Sender implements Runnable {
             } else if (transactionManager.isTransactional()) {
                 transactionManager.transitionToAbortableError(exception);
             }
+            transactionManager.removeInFlightBatch(batch);
+            if (adjustSequenceNumbers)
+                transactionManager.adjustSequencesDueToFailedBatch(batch);
         }
+
+        this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
         batch.done(baseOffset, logAppendTime, exception);
         this.accumulator.deallocate(batch);
     }
 
     /**
-     * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
+     * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed.
+     * We can also retry OutOfOrderSequence exceptions for future batches, since if the first batch has failed, the future
+     * batches are certain to fail with an OutOfOrderSequence exception.
      */
     private boolean canRetry(ProducerBatch batch, Errors error) {
-        return batch.attempts() < this.retries && error.exception() instanceof RetriableException;
+        return batch.attempts() < this.retries &&
+                ((error.exception() instanceof RetriableException) ||
+                        (error.exception() instanceof OutOfOrderSequenceException
+                                && transactionManager.canRetryOutOfOrderSequenceException(batch)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 05d943c..b2387a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -65,7 +65,27 @@ public class TransactionManager {
     private final String transactionalId;
     private final int transactionTimeoutMs;
 
-    private final Map<TopicPartition, Integer> sequenceNumbers;
+    // The base sequence of the next batch bound for a given partition.
+    private final Map<TopicPartition, Integer> nextSequence;
+
+    // The sequence of the last record of the last ack'd batch from the given partition. When there are no
+    // in flight requests for a partition, the lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
+    private final Map<TopicPartition, Integer> lastAckedSequence;
+
+    // If a batch bound for a partition expired locally after being sent at least once, the partition has is considered
+    // to have an unresolved state. We keep track fo such partitions here, and cannot assign any more sequence numbers
+    // for this partition until the unresolved state gets cleared. This may happen if other inflight batches returned
+    // successfully (indicating that the expired batch actually made it to the broker). If we don't get any successful
+    // responses for the partition once the inflight request count falls to zero, we reset the producer id and
+    // consequently clear this data structure as well.
+    private final Set<TopicPartition> partitionsWithUnresolvedSequences;
+
+    // Keep track of the in flight batches bound for a partition, ordered by sequence. This helps us to ensure that
+    // we continue to order batches by the sequence numbers even when the responses come back out of order during
+    // leader failover. We add a batch to the queue when it is drained, and remove it when the batch completes
+    // (either successfully or through a fatal failure).
+    private final Map<TopicPartition, PriorityQueue<ProducerBatch>> inflightBatchesBySequence;
+
     private final PriorityQueue<TxnRequestHandler> pendingRequests;
     private final Set<TopicPartition> newPartitionsInTransaction;
     private final Set<TopicPartition> pendingPartitionsInTransaction;
@@ -142,7 +162,8 @@ public class TransactionManager {
 
     public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
         this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
-        this.sequenceNumbers = new HashMap<>();
+        this.nextSequence = new HashMap<>();
+        this.lastAckedSequence = new HashMap<>();
         this.transactionalId = transactionalId;
         this.log = logContext.logger(TransactionManager.class);
         this.transactionTimeoutMs = transactionTimeoutMs;
@@ -159,6 +180,9 @@ public class TransactionManager {
             }
         });
 
+        this.partitionsWithUnresolvedSequences = new HashSet<>();
+        this.inflightBatchesBySequence = new HashMap<>();
+
         this.retryBackoffMs = retryBackoffMs;
     }
 
@@ -170,7 +194,7 @@ public class TransactionManager {
         ensureTransactional();
         transitionTo(State.INITIALIZING);
         setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
-        this.sequenceNumbers.clear();
+        this.nextSequence.clear();
         InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
         InitProducerIdHandler handler = new InitProducerIdHandler(builder);
         enqueueRequest(handler);
@@ -362,28 +386,157 @@ public class TransactionManager {
             throw new IllegalStateException("Cannot reset producer state for a transactional producer. " +
                     "You must either abort the ongoing transaction or reinitialize the transactional producer instead");
         setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
-        this.sequenceNumbers.clear();
+        this.nextSequence.clear();
+        this.lastAckedSequence.clear();
+        this.inflightBatchesBySequence.clear();
+        this.partitionsWithUnresolvedSequences.clear();
     }
 
     /**
      * Returns the next sequence number to be written to the given TopicPartition.
      */
     synchronized Integer sequenceNumber(TopicPartition topicPartition) {
-        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
+        Integer currentSequenceNumber = nextSequence.get(topicPartition);
         if (currentSequenceNumber == null) {
             currentSequenceNumber = 0;
-            sequenceNumbers.put(topicPartition, currentSequenceNumber);
+            nextSequence.put(topicPartition, currentSequenceNumber);
         }
         return currentSequenceNumber;
     }
 
     synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
-        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
+        Integer currentSequenceNumber = nextSequence.get(topicPartition);
         if (currentSequenceNumber == null)
             throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");
 
         currentSequenceNumber += increment;
-        sequenceNumbers.put(topicPartition, currentSequenceNumber);
+        nextSequence.put(topicPartition, currentSequenceNumber);
+    }
+
+    synchronized void addInFlightBatch(ProducerBatch batch) {
+        if (!batch.hasSequence())
+            throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set.");
+        if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
+            inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
+                @Override
+                public int compare(ProducerBatch o1, ProducerBatch o2) {
+                    return o1.baseSequence() - o2.baseSequence();
+                }
+            }));
+        }
+        inflightBatchesBySequence.get(batch.topicPartition).offer(batch);
+    }
+
+
+    synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
+        PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(topicPartition);
+        if (queue == null)
+            return null;
+        return queue.peek();
+    }
+
+    synchronized void removeInFlightBatch(ProducerBatch batch) {
+        PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(batch.topicPartition);
+        if (queue == null)
+            return;
+        queue.remove(batch);
+    }
+
+    synchronized void maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) {
+        if (sequence > lastAckedSequence(topicPartition))
+            lastAckedSequence.put(topicPartition, sequence);
+    }
+
+    synchronized int lastAckedSequence(TopicPartition topicPartition) {
+        Integer currentLastAckedSequence = lastAckedSequence.get(topicPartition);
+        if (currentLastAckedSequence == null)
+            return -1;
+        return currentLastAckedSequence;
+    }
+
+    // If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted
+    // so that they don't fail with the OutOfOrderSequenceException.
+    //
+    // This method must only be called when we know that the batch is question has been unequivocally failed by the broker,
+    // ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar.
+    synchronized void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
+        if (!this.nextSequence.containsKey(batch.topicPartition))
+            // Sequence numbers are not being tracked for this partition. This could happen if the producer id was just
+            // reset due to a previous OutOfOrderSequenceException.
+            return;
+        log.debug("producerId: {}, send to partition {} failed fatally. Reducing future sequence numbers by {}",
+                batch.producerId(), batch.topicPartition, batch.recordCount);
+        int currentSequence = sequenceNumber(batch.topicPartition);
+        currentSequence -= batch.recordCount;
+        if (currentSequence < 0)
+            throw new IllegalStateException("Sequence number for partition " + batch.topicPartition + " is going to become negative : " + currentSequence);
+
+        setNextSequence(batch.topicPartition, currentSequence);
+
+        for (ProducerBatch inFlightBatch : inflightBatchesBySequence.get(batch.topicPartition)) {
+            if (inFlightBatch.baseSequence() < batch.baseSequence())
+                continue;
+            int newSequence = inFlightBatch.baseSequence() - batch.recordCount;
+            if (newSequence < 0)
+                throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence()
+                        + " for partition " + batch.topicPartition + " is going to become negative :" + newSequence);
+
+            log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", inFlightBatch.baseSequence(), batch.topicPartition, newSequence);
+            inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence, inFlightBatch.isTransactional());
+        }
+    }
+
+    synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
+        return inflightBatchesBySequence.containsKey(topicPartition) && !inflightBatchesBySequence.get(topicPartition).isEmpty();
+    }
+
+    synchronized boolean hasUnresolvedSequences() {
+        return !partitionsWithUnresolvedSequences.isEmpty();
+    }
+
+    synchronized boolean hasUnresolvedSequence(TopicPartition topicPartition) {
+        return partitionsWithUnresolvedSequences.contains(topicPartition);
+    }
+
+    synchronized void markSequenceUnresolved(TopicPartition topicPartition) {
+        log.debug("Marking partition {} unresolved", topicPartition);
+        partitionsWithUnresolvedSequences.add(topicPartition);
+    }
+
+    // Checks if there are any partitions with unresolved partitions which may now be resolved. Returns true if
+    // the producer id needs a reset, false otherwise.
+    synchronized boolean shouldResetProducerStateAfterResolvingSequences() {
+        if (isTransactional())
+            // We should not reset producer state if we are transactional. We will transition to a fatal error instead.
+            return false;
+        for (TopicPartition topicPartition : partitionsWithUnresolvedSequences) {
+            if (!hasInflightBatches(topicPartition)) {
+                // The partition has been fully drained. At this point, the last ack'd sequence should be once less than
+                // next sequence destined for the partition. If so, the partition is fully resolved. If not, we should
+                // reset the sequence number if necessary.
+                if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) {
+                    // This would happen when a batch was expired, but subsequent batches succeeded.
+                    partitionsWithUnresolvedSequences.remove(topicPartition);
+                } else {
+                    // We would enter this branch if all in flight batches were ultimately expired in the producer.
+                    log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
+                            "Going to reset producer state.", topicPartition, lastAckedSequence(topicPartition), sequenceNumber(topicPartition));
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    synchronized boolean isNextSequence(TopicPartition topicPartition, int sequence) {
+        return sequence - lastAckedSequence(topicPartition) == 1;
+    }
+
+    private synchronized void setNextSequence(TopicPartition topicPartition, int sequence) {
+        if (!nextSequence.containsKey(topicPartition) && sequence != 0)
+            throw new IllegalStateException("Trying to set the sequence number for " + topicPartition + " to " + sequence +
+            ", but the sequence number was never set for this partition.");
+        nextSequence.put(topicPartition, sequence);
     }
 
     synchronized TxnRequestHandler nextRequestHandler(boolean hasIncompleteBatches) {
@@ -441,15 +594,15 @@ public class TransactionManager {
         lookupCoordinator(request.coordinatorType(), request.coordinatorKey());
     }
 
-    void setInFlightRequestCorrelationId(int correlationId) {
+    void setInFlightTransactionalRequestCorrelationId(int correlationId) {
         inFlightRequestCorrelationId = correlationId;
     }
 
-    void clearInFlightRequestCorrelationId() {
+    void clearInFlightTransactionalRequestCorrelationId() {
         inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
-    boolean hasInFlightRequest() {
+    boolean hasInFlightTransactionalRequest() {
         return inFlightRequestCorrelationId != NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
@@ -479,6 +632,11 @@ public class TransactionManager {
         return currentState == State.IN_TRANSACTION || isCompleting() || hasAbortableError();
     }
 
+    synchronized boolean canRetryOutOfOrderSequenceException(ProducerBatch batch) {
+        return hasProducerId(batch.producerId()) && !hasUnresolvedSequence(batch.topicPartition) &&
+                (batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()));
+    }
+
     // visible for testing
     synchronized boolean isReady() {
         return isTransactional() && currentState == State.READY;
@@ -629,7 +787,7 @@ public class TransactionManager {
             if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
                 fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
             } else {
-                clearInFlightRequestCorrelationId();
+                clearInFlightTransactionalRequestCorrelationId();
                 if (response.wasDisconnected()) {
                     log.debug("Disconnected from {}. Will retry.", response.destination());
                     if (this.needsCoordinator())

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceException.java b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceException.java
new file mode 100644
index 0000000..11f81af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class DuplicateSequenceException extends ApiException {
+
+    public DuplicateSequenceException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
deleted file mode 100644
index 469ba98..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-public class DuplicateSequenceNumberException extends RetriableException {
-
-    public DuplicateSequenceNumberException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 9decef2..bbc3486 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
-import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
+import org.apache.kafka.common.errors.DuplicateSequenceException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
@@ -432,7 +432,7 @@ public enum Errors {
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {
-                return new DuplicateSequenceNumberException(message);
+                return new DuplicateSequenceException(message);
             }
         }),
     INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch. Either there is a newer producer " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 19d25d7..fc83134 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -279,6 +279,17 @@ public class MemoryRecordsBuilder {
         aborted = true;
     }
 
+    public void reopenAndRewriteProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
+        if (aborted)
+            throw new IllegalStateException("Should not reopen a batch which is already aborted.");
+        builtRecords = null;
+        this.producerId = producerId;
+        this.producerEpoch = producerEpoch;
+        this.baseSequence = baseSequence;
+        this.isTransactional = isTransactional;
+    }
+
+
     public void close() {
         if (aborted)
             throw new IllegalStateException("Cannot close MemoryRecordsBuilder as it has already been aborted");
@@ -766,4 +777,7 @@ public class MemoryRecordsBuilder {
         return this.producerEpoch;
     }
 
+    public int baseSequence() {
+        return this.baseSequence;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 9960cce..71e32ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -224,6 +224,16 @@ public class MockClient implements KafkaClient {
         respond(response);
     }
 
+    // Utility method to enable out of order responses
+    public void respondToRequest(ClientRequest clientRequest, AbstractResponse response) {
+        AbstractRequest request = clientRequest.requestBuilder().build();
+        requests.remove(clientRequest);
+        short version = clientRequest.requestBuilder().desiredOrLatestVersion();
+        responses.add(new ClientResponse(clientRequest.makeHeader(version), clientRequest.callback(), clientRequest.destination(),
+                clientRequest.createdTimeMs(), time.milliseconds(), false, null, response));
+    }
+
+
     public void respond(AbstractResponse response, boolean disconnected) {
         ClientRequest request = requests.remove();
         short version = request.requestBuilder().desiredOrLatestVersion();


Mime
View raw message