kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5793; Tighten up the semantics of the OutOfOrderSequenceException
Date Thu, 21 Sep 2017 03:33:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 69d2a1771 -> 94692288b


KAFKA-5793; Tighten up the semantics of the OutOfOrderSequenceException

Description of the solution can be found here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Exactly+Once+-+Solving+the+problem+of+spurious+OutOfOrderSequence+errors

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3865 from apurvam/KAFKA-5793-tighten-up-out-of-order-sequence-v2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94692288
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94692288
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94692288

Branch: refs/heads/trunk
Commit: 94692288bedc353383e014914937a4798a1d4caa
Parents: 69d2a17
Author: Apurva Mehta <apurva@confluent.io>
Authored: Wed Sep 20 20:31:33 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed Sep 20 20:31:33 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   |  31 +-
 .../clients/producer/internals/Sender.java      |  17 +-
 .../producer/internals/TransactionManager.java  |  81 ++++-
 .../errors/UnknownProducerIdException.java      |  32 ++
 .../apache/kafka/common/protocol/Errors.java    |  14 +-
 .../kafka/common/protocol/types/Field.java      |  10 +
 .../kafka/common/protocol/types/Struct.java     |   6 +
 .../kafka/common/requests/ProduceRequest.java   |  14 +-
 .../kafka/common/requests/ProduceResponse.java  |  38 ++-
 .../clients/producer/internals/SenderTest.java  | 335 +++++++++++++++++--
 .../internals/TransactionManagerTest.java       |   2 +-
 .../common/requests/RequestResponseTest.java    |  33 +-
 .../main/scala/kafka/cluster/Partition.scala    |   6 +
 core/src/main/scala/kafka/log/Log.scala         |  11 +-
 .../scala/kafka/log/ProducerStateManager.scala  |  12 +-
 .../scala/kafka/server/ReplicaManager.scala     |  12 +-
 .../group/GroupCoordinatorTest.scala            |   6 +-
 .../group/GroupMetadataManagerTest.scala        |  10 +-
 .../TransactionStateManagerTest.scala           |   4 +-
 .../kafka/log/ProducerStateManagerTest.scala    |   6 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala |   4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  49 +++
 22 files changed, 651 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index eb162be..46cf6c4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -347,14 +347,16 @@ public final class RecordAccumulator {
         return numSplitBatches;
     }
 
-    // The deque for the partition may have to be reordered in situations where leadership changes in between
-    // batch drains. Since the requests are on different connections, we no longer have any guarantees about ordering
-    // of the responses. Hence we will have to check if there is anything out of order and ensure the batch is queued
-    // in the correct sequence order.
+    // We will have to do extra work to ensure the queue is in order when requests are being retried and there are
+    // multiple requests in flight to that partition. If the first inflight request fails to append, then all the subsequent
+    // in flight requests will also fail because the sequence numbers will not be accepted.
+    //
+    // Further, once batches are being retried, we are reduced to a single in flight request for that partition. So when
+    // the subsequent batches come back in sequence order, they will have to be placed further back in the queue.
     //
     // Note that this assumes that all the batches in the queue which have an assigned sequence also have the current
-    // producer id. We will not attempt to reorder messages if the producer id has changed.
-
+    // producer id. We will not attempt to reorder messages if the producer id has changed, we will throw an
+    // IllegalStateException instead.
     private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) {
         // When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence.
         if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
@@ -365,19 +367,16 @@ public final class RecordAccumulator {
             throw new IllegalStateException("We are reenqueueing a batch which is not tracked as part of the in flight " +
                     "requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence());
 
-        // If there are no inflight batches being tracked by the transaction manager, it means that the producer
-        // id must have changed and the batches being re enqueued are from the old producer id. In this case
-        // we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence,
-        // or they will succeed.
-        if (batch.baseSequence() != transactionManager.nextBatchBySequence(batch.topicPartition).baseSequence()) {
+        ProducerBatch firstBatchInQueue = deque.peekFirst();
+        if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) {
             // The incoming batch can't be inserted at the front of the queue without violating the sequence ordering.
             // This means that the incoming batch should be placed somewhere further back.
             // We need to find the right place for the incoming batch and insert it there.
-            // We will only enter this branch if we have multiple inflights sent to different brokers, perhaps
-            // because a leadership change occurred in between the drains. In this scenario, responses can come
-            // back out of order, requiring us to re order the batches ourselves rather than relying on the
-            // implicit ordering guarantees of the network client which are only on a per connection basis.
-
+            // We will only enter this branch if we have multiple inflights sent to different brokers and we need to retry
+            // the inflight batches.
+            //
+            // Since we reenqueue exactly one batch a time and ensure that the queue is ordered by sequence always, it
+            // is a simple linear scan of a subset of the in flight batches to find the right place in the queue each time.
             List<ProducerBatch> orderedBatches = new ArrayList<>();
             while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence())
                 orderedBatches.add(deque.pollFirst());

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index ecfad70..02b79c5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Measurable;
@@ -511,7 +512,7 @@ public class Sender implements Runnable {
             this.accumulator.deallocate(batch);
             this.sensors.recordBatchSplit();
         } else if (error != Errors.NONE) {
-            if (canRetry(batch, error)) {
+            if (canRetry(batch, response)) {
                 log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                         correlationId,
                         batch.topicPartition,
@@ -578,6 +579,7 @@ public class Sender implements Runnable {
                 log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", batch.producerId(), batch.topicPartition,
                         transactionManager.lastAckedSequence(batch.topicPartition));
             }
+            transactionManager.updateLastAckedOffset(response, batch);
             transactionManager.removeInFlightBatch(batch);
         }
 
@@ -591,12 +593,12 @@ public class Sender implements Runnable {
 
     private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) {
         if (transactionManager != null) {
-            if (exception instanceof OutOfOrderSequenceException
+            if ((exception instanceof OutOfOrderSequenceException || exception instanceof UnknownProducerIdException)
                     && !transactionManager.isTransactional()
                     && transactionManager.hasProducerId(batch.producerId())) {
-                log.error("The broker received an out of order sequence number for topic-partition " +
+                log.error("The broker returned {} for topic-partition " +
                                 "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
-                        batch.topicPartition, baseOffset);
+                        exception, batch.topicPartition, baseOffset);
 
                 // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
                 // about the previously committed message. Note that this will discard the producer id and sequence
@@ -616,6 +618,7 @@ public class Sender implements Runnable {
         }
 
         this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+
         batch.done(baseOffset, logAppendTime, exception);
         this.accumulator.deallocate(batch);
     }
@@ -625,11 +628,9 @@ public class Sender implements Runnable {
      * We can also retry OutOfOrderSequence exceptions for future batches, since if the first batch has failed, the future
      * batches are certain to fail with an OutOfOrderSequence exception.
      */
-    private boolean canRetry(ProducerBatch batch, Errors error) {
+    private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
         return batch.attempts() < this.retries &&
-                ((error.exception() instanceof RetriableException) ||
-                        (error.exception() instanceof OutOfOrderSequenceException
-                                && transactionManager.canRetryOutOfOrderSequenceException(batch)));
+                ((response.error.exception() instanceof RetriableException) || transactionManager.canRetry(response, batch));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index b2387a0..43abdbb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
@@ -86,6 +87,10 @@ public class TransactionManager {
     // (either successfully or through a fatal failure).
     private final Map<TopicPartition, PriorityQueue<ProducerBatch>> inflightBatchesBySequence;
 
+    // We keep track of the last acknowledged offset on a per partition basis in order to disambiguate UnknownProducer
+    // responses which are due to the retention period elapsing, and those which are due to actual lost data.
+    private final Map<TopicPartition, Long> lastAckedOffset;
+
     private final PriorityQueue<TxnRequestHandler> pendingRequests;
     private final Set<TopicPartition> newPartitionsInTransaction;
     private final Set<TopicPartition> pendingPartitionsInTransaction;
@@ -182,6 +187,7 @@ public class TransactionManager {
 
         this.partitionsWithUnresolvedSequences = new HashSet<>();
         this.inflightBatchesBySequence = new HashMap<>();
+        this.lastAckedOffset = new HashMap<>();
 
         this.retryBackoffMs = retryBackoffMs;
     }
@@ -390,6 +396,7 @@ public class TransactionManager {
         this.lastAckedSequence.clear();
         this.inflightBatchesBySequence.clear();
         this.partitionsWithUnresolvedSequences.clear();
+        this.lastAckedOffset.clear();
     }
 
     /**
@@ -454,6 +461,21 @@ public class TransactionManager {
         return currentLastAckedSequence;
     }
 
+    synchronized long lastAckedOffset(TopicPartition topicPartition) {
+        Long offset = lastAckedOffset.get(topicPartition);
+        if (offset == null)
+            return -1;
+        return offset;
+    }
+
+    synchronized void updateLastAckedOffset(ProduceResponse.PartitionResponse response, ProducerBatch batch) {
+        if (response.baseOffset == ProduceResponse.INVALID_OFFSET)
+            return;
+        long lastOffset = response.baseOffset + batch.recordCount - 1;
+        if (lastOffset > lastAckedOffset(batch.topicPartition))
+            lastAckedOffset.put(batch.topicPartition, lastOffset);
+    }
+
     // If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted
     // so that they don't fail with the OutOfOrderSequenceException.
     //
@@ -486,6 +508,20 @@ public class TransactionManager {
         }
     }
 
+    private synchronized void startSequencesAtBeginning(TopicPartition topicPartition) {
+        int sequence = 0;
+        for (ProducerBatch inFlightBatch : inflightBatchesBySequence.get(topicPartition)) {
+            log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}",
+                    inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence);
+            inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(),
+                    inFlightBatch.producerEpoch()), sequence, inFlightBatch.isTransactional());
+
+            sequence += inFlightBatch.recordCount;
+        }
+        setNextSequence(topicPartition, sequence);
+        lastAckedSequence.remove(topicPartition);
+    }
+
     synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
         return inflightBatchesBySequence.containsKey(topicPartition) && !inflightBatchesBySequence.get(topicPartition).isEmpty();
     }
@@ -632,9 +668,48 @@ public class TransactionManager {
         return currentState == State.IN_TRANSACTION || isCompleting() || hasAbortableError();
     }
 
-    synchronized boolean canRetryOutOfOrderSequenceException(ProducerBatch batch) {
-        return hasProducerId(batch.producerId()) && !hasUnresolvedSequence(batch.topicPartition) &&
-                (batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()));
+    synchronized boolean canRetry(ProduceResponse.PartitionResponse response, ProducerBatch batch) {
+        if (!hasProducerId(batch.producerId()))
+            return false;
+
+        Errors error = response.error;
+        if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && !hasUnresolvedSequence(batch.topicPartition) &&
+                (batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence())))
+            // We should retry the OutOfOrderSequenceException if the batch is _not_ the next batch, ie. its base
+            // sequence isn't the lastAckedSequence + 1. However, if the first in flight batch fails fatally, we will
+            // adjust the sequences of the other inflight batches to account for the 'loss' of the sequence range in
+            // the batch which failed. In this case, an inflight batch will have a base sequence which is
+            // the lastAckedSequence + 1 after adjustment. When this batch fails with an OutOfOrderSequence, we want to retry it.
+            // To account for the latter case, we check whether the sequence has been reset since the last drain.
+            // If it has, we will retry it anyway.
+            return true;
+
+        if (error == Errors.UNKNOWN_PRODUCER_ID) {
+            if (response.logStartOffset == -1)
+                // We don't know the log start offset with this response. We should just retry the request until we get it.
+                // The UNKNOWN_PRODUCER_ID error code was added along with the new ProduceResponse which includes the
+                // logStartOffset. So the '-1' sentinel is not for backward compatibility. Instead, it is possible for
+                // a broker to not know the logStartOffset at when it is returning the response because the partition
+                // may have moved away from the broker from the time the error was initially raised to the time the
+                // response was being constructed. In these cases, we should just retry the request: we are guaranteed
+                // to eventually get a logStartOffset once things settle down.
+                return true;
+
+            if (batch.sequenceHasBeenReset()) {
+                // When the first inflight batch fails due to the truncation case, then the sequences of all the other
+                // in flight batches would have been restarted from the beginning. However, when those responses
+                // come back from the broker, they would also come with an UNKNOWN_PRODUCER_ID error. In this case, we should not
+                // reset the sequence numbers to the beginning.
+                return true;
+            } else if (lastAckedOffset(batch.topicPartition) < response.logStartOffset) {
+                // The head of the log has been removed, probably due to the retention time elapsing. In this case,
+                // we expect to lose the producer state. Reset the sequences of all inflight batches to be from the beginning
+                // and retry them.
+                startSequencesAtBeginning(batch.topicPartition);
+                return true;
+            }
+        }
+        return false;
     }
 
     // visible for testing

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/errors/UnknownProducerIdException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownProducerIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownProducerIdException.java
new file mode 100644
index 0000000..ce17345
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownProducerIdException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception is raised by the broker if it could not locate the producer metadata associated with the producerId
+ * in question. This could happen if, for instance, the producer's records were deleted because their retention time
+ * had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker,
+ * and future appends by the producer will return this exception.
+ */
+public class UnknownProducerIdException extends OutOfOrderSequenceException {
+
+    public UnknownProducerIdException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 1039ca0..d9a95d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -72,6 +72,7 @@ import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
@@ -524,7 +525,18 @@ public enum Errors {
             public ApiException build(String message) {
                 return new AuthenticationFailedException(message);
             }
-        });
+    }),
+    UNKNOWN_PRODUCER_ID(59, "This exception is raised by the broker if it could not locate the producer metadata " +
+            "associated with the producerId in question. This could happen if, for instance, the producer's records " +
+            "were deleted because their retention time had elapsed. Once the last records of the producerId are " +
+            "removed, the producer's metadata is removed from the broker, and future appends by the producer will " +
+            "return this exception.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnknownProducerIdException(message);
+            }
+    });
 
     private interface ApiExceptionBuilder {
         ApiException build(String message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 8da848b..ec217f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -62,6 +62,16 @@ public class Field {
         }
     }
 
+    public static class Int64 extends Field {
+        public Int64(String name, String docString) {
+            super(name, Type.INT64, docString, false, null);
+        }
+
+        public Int64(String name, String docString, long defaultValue) {
+            super(name, Type.INT64, docString, true, defaultValue);
+        }
+    }
+
     public static class Int16 extends Field {
         public Int16(String name, String docString) {
             super(name, Type.INT16, docString, false, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index b3e9975..1cbbcb3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -95,6 +95,12 @@ public class Struct {
         return getString(field.name);
     }
 
+    public Long getOrElse(Field.Int64 field, long alternative) {
+        if (hasField(field.name))
+            return getLong(field.name);
+        return alternative;
+    }
+
     public Integer getOrElse(Field.Int32 field, int alternative) {
         if (hasField(field.name))
             return getInt(field.name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index eac7661..8ab0b20 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -102,9 +102,17 @@ public class ProduceRequest extends AbstractRequest {
      */
     private static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3;
 
+    /**
+     * The body of the PRODUCE_REQUEST_V5 is the same as PRODUCE_REQUEST_V4.
+     * The version number is bumped since the PRODUCE_RESPONSE_V5 includes an additional partition level
+     * field: the log_start_offset.
+     */
+    private static final Schema PRODUCE_REQUEST_V5 = PRODUCE_REQUEST_V4;
+
+
     public static Schema[] schemaVersions() {
         return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
-            PRODUCE_REQUEST_V4};
+            PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5};
     }
 
     public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
@@ -119,7 +127,7 @@ public class ProduceRequest extends AbstractRequest {
                        int timeout,
                        Map<TopicPartition, MemoryRecords> partitionRecords,
                        String transactionalId) {
-            super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? 3 : 2));
+            super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? ApiKeys.PRODUCE.latestVersion() : 2));
             this.magic = magic;
             this.acks = acks;
             this.timeout = timeout;
@@ -304,6 +312,7 @@ public class ProduceRequest extends AbstractRequest {
             case 2:
             case 3:
             case 4:
+            case 5:
                 return new ProduceResponse(responseMap, throttleTimeMs);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@@ -366,6 +375,7 @@ public class ProduceRequest extends AbstractRequest {
 
             case 3:
             case 4:
+            case 5:
                 return RecordBatch.MAGIC_VALUE_V2;
 
             default:

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 4786307..e1978dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -71,6 +71,10 @@ public class ProduceResponse extends AbstractResponse {
 
     private static final String BASE_OFFSET_KEY_NAME = "base_offset";
     private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time";
+    private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
+
+    private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME,
+            "The start offset of the log at the time this produce response was created", INVALID_OFFSET);
 
     private static final Schema PRODUCE_RESPONSE_V0 = new Schema(
             new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
@@ -116,9 +120,29 @@ public class ProduceResponse extends AbstractResponse {
      */
     private static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3;
 
+
+    /**
+     * Add in the log_start_offset field to the partition response to filter out spurious OutOfOrderSequencExceptions
+     * on the client.
+     */
+    public static final Schema PRODUCE_RESPONSE_V5 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                            PARTITION_ID,
+                            ERROR_CODE,
+                            new Field(BASE_OFFSET_KEY_NAME, INT64),
+                            new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " +
+                                    "the messages. If CreateTime is used for the topic, the timestamp will be -1. " +
+                                    "If LogAppendTime is used for the topic, the timestamp will be the broker local " +
+                                    "time when the messages are appended."),
+                            LOG_START_OFFSET_FIELD)))))),
+            THROTTLE_TIME_MS);
+
+
     public static Schema[] schemaVersions() {
         return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3,
-            PRODUCE_RESPONSE_V4};
+            PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5};
     }
 
     private final Map<TopicPartition, PartitionResponse> responses;
@@ -156,8 +180,9 @@ public class ProduceResponse extends AbstractResponse {
                 Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
                 long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
                 long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME);
+                long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
                 TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(error, offset, logAppendTime));
+                responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset));
             }
         }
         this.throttleTime = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
@@ -188,6 +213,7 @@ public class ProduceResponse extends AbstractResponse {
                         .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
                 if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
                     partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
+                partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset);
                 partitionArray.add(partStruct);
             }
             topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
@@ -211,15 +237,17 @@ public class ProduceResponse extends AbstractResponse {
         public Errors error;
         public long baseOffset;
         public long logAppendTime;
+        public long logStartOffset;
 
         public PartitionResponse(Errors error) {
-            this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP);
+            this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET);
         }
 
-        public PartitionResponse(Errors error, long baseOffset, long logAppendTime) {
+        public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) {
             this.error = error;
             this.baseOffset = baseOffset;
             this.logAppendTime = logAppendTime;
+            this.logStartOffset = logStartOffset;
         }
 
         @Override
@@ -232,6 +260,8 @@ public class ProduceResponse extends AbstractResponse {
             b.append(baseOffset);
             b.append(",logAppendTime: ");
             b.append(logAppendTime);
+            b.append(", logStartOffset: ");
+            b.append(logStartOffset);
             b.append('}');
             return b.toString();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index a64bc56..0995e36 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -203,7 +203,7 @@ public class SenderTest {
         // start off support produce request v3
         apiVersions.update("0", NodeApiVersions.create());
 
-        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, RecordBatch.NO_TIMESTAMP);
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, RecordBatch.NO_TIMESTAMP, 100);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = new HashMap<>();
         partResp.put(tp0, resp);
         partResp.put(tp1, resp);
@@ -550,7 +550,8 @@ public class SenderTest {
 
 
     @Test
-    public void testIdempotenceWithMultipleInflightsFirstFails() throws Exception {
+    public void testIdempotenceWithMultipleInflightsRetriedInOrder() throws Exception {
+        // Send multiple in flight requests, retry them all one at a time, in the correct order.
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
@@ -571,51 +572,79 @@ public class SenderTest {
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
-        assertEquals(2, client.inFlightRequestCount());
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+
+         // Send third ProduceRequest
+        Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+
+        assertEquals(3, client.inFlightRequestCount());
+        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
+        assertFalse(request3.isDone());
         assertTrue(client.isReady(node, time.milliseconds()));
 
         sendIdempotentProducerResponse(0, tp0, Errors.LEADER_NOT_AVAILABLE, -1L);
-
         sender.run(time.milliseconds()); // receive response 0
 
-        assertEquals(1, client.inFlightRequestCount());
+        // Queue the fourth request, it shouldn't be sent until the first 3 complete.
+        Future<RecordMetadata> request4 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+
+        assertEquals(2, client.inFlightRequestCount());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
+        sender.run(time.milliseconds()); // re send request 1, receive response 2
 
-        sender.run(time.milliseconds()); // re send request 0, receive response 1
+        sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
+        sender.run(time.milliseconds()); // receive response 3
 
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
         assertEquals(1, client.inFlightRequestCount());
 
         sender.run(time.milliseconds()); // Do nothing, we are reduced to one in flight request during retries.
 
+        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());  // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented.
         assertEquals(1, client.inFlightRequestCount());
 
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
-        sender.run(time.milliseconds());  // receive response 0
+        sender.run(time.milliseconds());  // receive response 1
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
-        assertEquals(0, client.inFlightRequestCount());
-
-        assertFalse(request2.isDone());
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
 
-        sender.run(time.milliseconds()); // send request 1
+
+        assertFalse(client.hasInFlightRequests());
+        sender.run(time.milliseconds()); // send request 2;
         assertEquals(1, client.inFlightRequestCount());
-        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
-        sender.run(time.milliseconds());  // receive response 1
 
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
+        sender.run(time.milliseconds());  // receive response 2
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
+
         assertFalse(client.hasInFlightRequests());
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+
+        sender.run(time.milliseconds()); // send request 3
+        assertEquals(1, client.inFlightRequestCount());
+
+        sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L);
+        sender.run(time.milliseconds());  // receive response 3, send request 4 since we are out of 'retry' mode.
+        assertEquals(2, transactionManager.lastAckedSequence(tp0));
+        assertTrue(request3.isDone());
+        assertEquals(2, request3.get().offset());
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L);
+        sender.run(time.milliseconds());  // receive response 4
+        assertEquals(3, transactionManager.lastAckedSequence(tp0));
+        assertTrue(request4.isDone());
+        assertEquals(3, request4.get().offset());
     }
 
     @Test
@@ -1118,10 +1147,11 @@ public class SenderTest {
         ClientRequest firstClientRequest = client.requests().peek();
         ClientRequest secondClientRequest = (ClientRequest) client.requests().toArray()[1];
 
-        client.respondToRequest(secondClientRequest, produceResponse(tp0, 1, Errors.NONE, -1));
+        client.respondToRequest(secondClientRequest, produceResponse(tp0, 1000, Errors.NONE, 0));
 
         sender.run(time.milliseconds()); // receive response 1
 
+        assertEquals(1000, transactionManager.lastAckedOffset(tp0));
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
 
         client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.DUPLICATE_SEQUENCE_NUMBER, -1));
@@ -1130,10 +1160,267 @@ public class SenderTest {
 
         // Make sure that the last ack'd sequence doesn't change.
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(1000, transactionManager.lastAckedOffset(tp0));
+        assertFalse(client.hasInFlightRequests());
+    }
+
+    @Test
+    public void testUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
+
+        sender.run(time.milliseconds());  // receive the response.
+
+        assertTrue(request1.isDone());
+        assertEquals(1000L, request1.get().offset());
+        assertEquals(0L, transactionManager.lastAckedSequence(tp0));
+        assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
+
+        // Send second ProduceRequest, a single batch with 2 records.
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+
+        assertFalse(request2.isDone());
+
+        sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
+        sender.run(time.milliseconds()); // receive response 0, should be retried since the logStartOffset > lastAckedOffset.
+
+        // We should have reset the sequence number state of the partition because the state was lost on the broker.
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertFalse(request2.isDone());
+        assertFalse(client.hasInFlightRequests());
+
+        sender.run(time.milliseconds()); // should retry request 1
+
+        // resend the request. Note that the expected sequence is 0, since we have lost producer state on the broker.
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
+        sender.run(time.milliseconds()); // receive response 1
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertFalse(client.hasInFlightRequests());
+        assertTrue(request2.isDone());
+        assertEquals(1012L, request2.get().offset());
+        assertEquals(1012L, transactionManager.lastAckedOffset(tp0));
+    }
+
+    @Test
+    public void testUnknownProducerErrorShouldBeRetriedWhenLogStartOffsetIsUnknown() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
+
+        sender.run(time.milliseconds());  // receive the response.
+
+        assertTrue(request1.isDone());
+        assertEquals(1000L, request1.get().offset());
+        assertEquals(0L, transactionManager.lastAckedSequence(tp0));
+        assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+
+        assertFalse(request2.isDone());
+
+        sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, -1L);
+        sender.run(time.milliseconds()); // receive response 0, should be retried without resetting the sequence numbers since the log start offset is unknown.
+
+        // We should have reset the sequence number state of the partition because the state was lost on the broker.
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertFalse(request2.isDone());
+        assertFalse(client.hasInFlightRequests());
+
+        sender.run(time.milliseconds()); // should retry request 1
+
+        // resend the request. Note that the expected sequence is 1, since we never got the logStartOffset in the previous
+        // response and hence we didn't reset the sequence numbers.
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1011L, 1010L);
+        sender.run(time.milliseconds()); // receive response 1
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertFalse(client.hasInFlightRequests());
+        assertTrue(request2.isDone());
+        assertEquals(1011L, request2.get().offset());
+        assertEquals(1011L, transactionManager.lastAckedOffset(tp0));
     }
 
-    void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) {
+    @Test
+    public void testUnknownProducerErrorShouldBeRetriedForFutureBatchesWhenFirstFails() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
+
+        sender.run(time.milliseconds());  // receive the response.
+
+        assertTrue(request1.isDone());
+        assertEquals(1000L, request1.get().offset());
+        assertEquals(0L, transactionManager.lastAckedSequence(tp0));
+        assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+
+        // Send the third ProduceRequest, in parallel with the second. It should be retried even though the
+        // lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back.
+        Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+
+        assertFalse(request2.isDone());
+        assertFalse(request3.isDone());
+        assertEquals(2, client.inFlightRequestCount());
+
+
+        sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
+        sender.run(time.milliseconds()); // receive response 2, should reset the sequence numbers and be retried.
+
+        // We should have reset the sequence number state of the partition because the state was lost on the broker.
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertFalse(request2.isDone());
+        assertFalse(request3.isDone());
+        assertEquals(1, client.inFlightRequestCount());
+
+        sender.run(time.milliseconds()); // resend request 2.
+
+        assertEquals(2, client.inFlightRequestCount());
+
+        // receive the original response 3. note the expected sequence is still the originally assigned sequence.
+        sendIdempotentProducerResponse(2, tp0, Errors.UNKNOWN_PRODUCER_ID, -1, 1010L);
+        sender.run(time.milliseconds()); // receive response 3
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
+        sender.run(time.milliseconds());  // receive response 2, don't send request 3 since we can have at most 1 in flight when retrying
+
+        assertTrue(request2.isDone());
+        assertFalse(request3.isDone());
+        assertFalse(client.hasInFlightRequests());
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(1011L, request2.get().offset());
+        assertEquals(1011L, transactionManager.lastAckedOffset(tp0));
+
+        sender.run(time.milliseconds());  // resend request 3.
+        assertEquals(1, client.inFlightRequestCount());
+
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1012L, 1010L);
+        sender.run(time.milliseconds());  // receive response 3.
+
+        assertFalse(client.hasInFlightRequests());
+        assertTrue(request3.isDone());
+        assertEquals(1012L, request3.get().offset());
+        assertEquals(1012L, transactionManager.lastAckedOffset(tp0));
+    }
+
+    @Test
+    public void testShouldRaiseOutOfOrderSequenceExceptionToUserIfLogWasNotTruncated() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
+
+        sender.run(time.milliseconds());  // receive the response.
+
+        assertTrue(request1.isDone());
+        assertEquals(1000L, request1.get().offset());
+        assertEquals(0L, transactionManager.lastAckedSequence(tp0));
+        assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
+
+        // Send second ProduceRequest,
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+
+        assertFalse(request2.isDone());
+
+        sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L);
+        sender.run(time.milliseconds()); // receive response 0, should cause a producerId reset since the logStartOffset < lastAckedOffset
+
+        assertTrue(request2.isDone());
+        try {
+            request2.get();
+            fail("Should have raised an OutOfOrderSequenceException");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
+        }
+
+    }
+    void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) {
+        sendIdempotentProducerResponse(expectedSequence, tp, responseError, responseOffset, -1L);
+    }
+
+    void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset, long logStartOffset) {
         client.respond(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -1148,7 +1435,7 @@ public class SenderTest {
 
                 return true;
             }
-        }, produceResponse(tp, responseOffset, responseError, 0));
+        }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset));
     }
 
     @Test
@@ -1432,7 +1719,7 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
-            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
+            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
             client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()),
                     new ProduceResponse(responseMap));
 
@@ -1449,7 +1736,7 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
-            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
+            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L));
             client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()),
                     new ProduceResponse(responseMap));
 
@@ -1505,12 +1792,16 @@ public class SenderTest {
         }
     }
 
-    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
-        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP);
+    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }
 
+    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
+        return produceResponse(tp, offset, error, throttleTimeMs, -1L);
+    }
+
     private void setupWithTransactionState(TransactionManager transactionManager) {
         Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("client-id", CLIENT_ID);

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 28f9c82..8fc43df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -2377,7 +2377,7 @@ public class TransactionManagerTest {
     }
 
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
-        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP);
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, 10);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index cc65003..e2e458d1 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -340,10 +340,39 @@ public class RequestResponseTest {
     }
 
     @Test
+    public void produceResponseV5Test() {
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
+        TopicPartition tp0 = new TopicPartition("test", 0);
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
+                10000, RecordBatch.NO_TIMESTAMP, 100));
+
+        ProduceResponse v5Response = new ProduceResponse(responseData, 10);
+        short version = 5;
+
+        ByteBuffer buffer = v5Response.serialize(version, new ResponseHeader(0));
+        buffer.rewind();
+
+        ResponseHeader.parse(buffer); // throw away.
+
+        Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer);
+
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
+                deserializedStruct);
+
+        assertEquals(1, v5FromBytes.responses().size());
+        assertTrue(v5FromBytes.responses().containsKey(tp0));
+        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
+        assertEquals(100, partitionResponse.logStartOffset);
+        assertEquals(10000, partitionResponse.baseOffset);
+        assertEquals(10, v5FromBytes.getThrottleTime());
+        assertEquals(responseData, v5Response.responses());
+    }
+
+    @Test
     public void produceResponseVersionTest() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP));
+                10000, RecordBatch.NO_TIMESTAMP, 100));
         ProduceResponse v0Response = new ProduceResponse(responseData);
         ProduceResponse v1Response = new ProduceResponse(responseData, 10);
         ProduceResponse v2Response = new ProduceResponse(responseData, 10);
@@ -744,7 +773,7 @@ public class RequestResponseTest {
     private ProduceResponse createProduceResponse() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP));
+                10000, RecordBatch.NO_TIMESTAMP, 100));
         return new ProduceResponse(responseData, 0);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 3b5fee0..f6f825f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -518,6 +518,12 @@ class Partition(val topic: String,
     info
   }
 
+  def logStartOffset: Long = {
+    inReadLock(leaderIsrUpdateLock) {
+      leaderReplicaIfLocal.map(_.log.get.logStartOffset).getOrElse(-1)
+    }
+  }
+
   /**
    * Update logStartOffset and low watermark if 1) offset <= highWatermark and 2) it is the leader replica.
    * This function can trigger log segment deletion and log rolling.

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d98f443..f32da4b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -47,8 +47,12 @@ import java.lang.{Long => JLong}
 import java.util.regex.Pattern
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
     NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+
+  def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
+    LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+      NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
@@ -60,6 +64,7 @@ object LogAppendInfo {
  * @param maxTimestamp The maximum timestamp of the message set.
  * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
  * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
+ * @param logStartOffset The start offset of the log at the time of this append.
  * @param sourceCodec The source codec used in the message set (send by the producer)
  * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
  * @param shallowCount The number of shallow messages
@@ -71,6 +76,7 @@ case class LogAppendInfo(var firstOffset: Long,
                          var maxTimestamp: Long,
                          var offsetOfMaxTimestamp: Long,
                          var logAppendTime: Long,
+                         var logStartOffset: Long,
                          sourceCodec: CompressionCodec,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
@@ -669,6 +675,7 @@ class Log(@volatile var dir: File,
           appendInfo.firstOffset = duplicate.firstOffset
           appendInfo.lastOffset = duplicate.lastOffset
           appendInfo.logAppendTime = duplicate.timestamp
+          appendInfo.logStartOffset = logStartOffset
           return appendInfo
         }
 
@@ -861,7 +868,7 @@ class Log(@volatile var dir: File,
 
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
-    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec,
+    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, sourceCodec,
       targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 69d4e36..1cf9a14 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -161,9 +161,15 @@ private[log] class ProducerAppendInfo(val producerId: Long,
         s"with a newer epoch. $producerEpoch (request epoch), ${currentEntry.producerEpoch} (server epoch)")
     } else if (validateSequenceNumbers) {
       if (producerEpoch != currentEntry.producerEpoch) {
-        if (firstSeq != 0)
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
-            s"(request epoch), $firstSeq (seq. number)")
+        if (firstSeq != 0) {
+          if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
+            throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
+              s"(request epoch), $firstSeq (seq. number)")
+          } else {
+            throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " +
+              s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")
+          }
+        }
       } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
         // the epoch was bumped by a control record, so we expect the sequence number to be reset
         throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 064472d..9cc6317 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -445,7 +445,7 @@ class ReplicaManager(val config: KafkaConfig,
         topicPartition ->
                 ProducePartitionStatus(
                   result.info.lastOffset + 1, // required offset
-                  new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
+                  new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status
       }
 
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
@@ -471,7 +471,7 @@ class ReplicaManager(val config: KafkaConfig,
       // Just return an error and don't handle the request at all
       val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
         topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
-          LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP)
+          LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
       }
       responseCallback(responseStatus)
     }
@@ -734,10 +734,16 @@ class ReplicaManager(val config: KafkaConfig,
                    _: InvalidTimestampException) =>
             (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
           case t: Throwable =>
+            val logStartOffset = getPartition(topicPartition) match {
+              case Some(partition) =>
+                partition.logStartOffset
+              case _ =>
+                -1
+            }
             brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
             brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
             error("Error processing append operation on partition %s".format(topicPartition), t)
-            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
+            (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 592e343..2ffd828 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1354,7 +1354,7 @@ class GroupCoordinatorTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
-          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)
         )
       )})
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
@@ -1438,7 +1438,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
           Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
-            new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+            new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)
           )
         )
       })
@@ -1467,7 +1467,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) ->
-          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)
         )
       )})
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V2)).anyTimes()

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 24e2920..a2f5f92 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -837,7 +837,7 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
     assertTrue(group.allOffsets.isEmpty)
     capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
-      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)))
+      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
 
     assertTrue(group.hasOffsets)
     assertTrue(group.allOffsets.isEmpty)
@@ -877,7 +877,7 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
     assertTrue(group.allOffsets.isEmpty)
     capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
-      new PartitionResponse(Errors.NOT_ENOUGH_REPLICAS, 0L, RecordBatch.NO_TIMESTAMP)))
+      new PartitionResponse(Errors.NOT_ENOUGH_REPLICAS, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
 
     assertFalse(group.hasOffsets)
     assertTrue(group.allOffsets.isEmpty)
@@ -916,7 +916,7 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
     assertTrue(group.allOffsets.isEmpty)
     capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
-      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)))
+      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
 
     assertTrue(group.hasOffsets)
     assertTrue(group.allOffsets.isEmpty)
@@ -995,7 +995,7 @@ class GroupMetadataManagerTest {
     groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
     assertTrue(group.hasOffsets)
     capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
-      new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP)))
+      new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
@@ -1324,7 +1324,7 @@ class GroupMetadataManagerTest {
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(groupTopicPartition ->
-          new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
+          new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
         )
       )})
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index e86e088..49c8e6a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -503,7 +503,7 @@ class TransactionStateManagerTest {
           override def answer(): Unit = {
             capturedArgument.getValue.apply(
               Map(partition ->
-                new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
+                new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
               )
             )
           }
@@ -602,7 +602,7 @@ class TransactionStateManagerTest {
     ).andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = capturedArgument.getValue.apply(
           Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) ->
-            new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
+            new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
           )
         )
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 976bbd7..9eb9ae7 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -305,7 +305,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     append(recoveredMapping, producerId, epoch, 2, 2L)
   }
 
-  @Test(expected = classOf[OutOfOrderSequenceException])
+  @Test(expected = classOf[UnknownProducerIdException])
   def testRemoveExpiredPidsOnReload(): Unit = {
     val epoch = 0.toShort
     append(stateManager, producerId, epoch, 0, 0L, 0)
@@ -482,14 +482,14 @@ class ProducerStateManagerTest extends JUnitSuite {
     append(stateManager, producerId, epoch, 2, 3L, 4L)
     stateManager.takeSnapshot()
 
-    intercept[OutOfOrderSequenceException] {
+    intercept[UnknownProducerIdException] {
       val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
       recoveredMapping.truncateAndReload(0L, 1L, time.milliseconds)
       append(recoveredMapping, pid2, epoch, 1, 4L, 5L)
     }
   }
 
-  @Test(expected = classOf[OutOfOrderSequenceException])
+  @Test(expected = classOf[UnknownProducerIdException])
   def testPidExpirationTimeout() {
     val epoch = 5.toShort
     val sequence = 37

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index a7817ad..b9a4dfe 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -114,9 +114,9 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     val correlationId = -1
     TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
 
-    val version = 2: Short
+    val version = ApiKeys.PRODUCE.latestVersion: Short
     val serializedBytes = {
-      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, ApiKeys.PRODUCE.latestVersion, null,
+      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null,
         correlationId)
       val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes))
       val request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 10000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index f5581f8..27d4312 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -175,6 +175,55 @@ class ReplicaManagerTest {
   }
 
   @Test
+  def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
+    val timer = new MockTimer
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
+      partition.getOrCreateReplica(0)
+
+      // Make this replica the leader.
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, brokerList, 0, brokerList, true)).asJava,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
+      replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
+
+      val producerId = 234L
+      val epoch = 5.toShort
+
+      // write a few batches as part of a transaction
+      val numRecords = 3
+      for (sequence <- 0 until numRecords) {
+        val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, epoch, sequence,
+          new SimpleRecord(s"message $sequence".getBytes))
+        appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
+      }
+
+      assertEquals(0, partition.logStartOffset)
+
+      // Append a record with an out of range sequence. We should get the OutOfOrderSequence error code with the log
+      // start offset set.
+      val outOfRangeSequence = numRecords + 10
+      val record = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, epoch, outOfRangeSequence,
+        new SimpleRecord(s"message: $outOfRangeSequence".getBytes))
+      appendRecords(replicaManager, new TopicPartition(topic, 0), record).onFire { response =>
+        assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, response.error)
+        assertEquals(0, response.logStartOffset)
+      }
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+
+  }
+
+  @Test
   def testReadCommittedFetchLimitedAtLSO(): Unit = {
     val timer = new MockTimer
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)


Mime
View raw message