kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] kafka git commit: KAFKA-5494; Enable idempotence with max.in.flight.requests.per.connection > 1
Date Thu, 14 Sep 2017 23:11:41 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 6f98e52..26e3e6c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -66,6 +68,7 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -467,6 +470,657 @@ public class SenderTest {
         assertSendFailure(ClusterAuthorizationException.class);
     }
 
+
+    @Test
+    public void testIdempotenceWithMultipleInflights() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+        assertFalse(request2.isDone());
+
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
+        sender.run(time.milliseconds()); // receive response 1
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(client.hasInFlightRequests());
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+    }
+
+
+    @Test
+    public void testIdempotenceWithMultipleInflightsFirstFails() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.LEADER_NOT_AVAILABLE, -1L);
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
+
+        sender.run(time.milliseconds()); // re send request 0, receive response 1
+
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(1, client.inFlightRequestCount());
+
+        sender.run(time.milliseconds()); // Do nothing, we are reduced to one in flight request during retries.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+        sender.run(time.milliseconds());  // receive response 0
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        assertFalse(request2.isDone());
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+
+        sender.run(time.milliseconds()); // send request 1
+        assertEquals(1, client.inFlightRequestCount());
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
+        sender.run(time.milliseconds());  // receive response 1
+
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+        assertFalse(client.hasInFlightRequests());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+    }
+
+    @Test
+    public void testIdempotenceWithMultipleInflightsWhereFirstFailsFatallyAndSequenceOfFutureBatchesIsAdjusted() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.MESSAGE_TOO_LARGE, -1L);
+
+        sender.run(time.milliseconds()); // receive response 0, should adjust sequences of future batches.
+
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised an error");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof RecordTooLargeException);
+        }
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
+
+        sender.run(time.milliseconds()); // receive response 1
+
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        sender.run(time.milliseconds()); // resend request 1
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+        sender.run(time.milliseconds());  // receive response 1
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        assertTrue(request1.isDone());
+        assertEquals(0, request2.get().offset());
+    }
+
+    @Test
+    public void testMustNotRetryOutOfOrderSequenceForNextBatch() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest with multiple messages.
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+
+        // make sure the next sequence number accounts for multi-message batches.
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0);
+
+        sender.run(time.milliseconds());
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        // This OutOfOrderSequence is fatal since it is returned for the batch succeeding the last acknowledged batch.
+        sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
+
+        sender.run(time.milliseconds());
+        assertTrue(request2.isDone());
+
+        try {
+            request2.get();
+            fail("Expected an OutOfOrderSequenceException");
+        } catch (ExecutionException e) {
+            assert e.getCause() instanceof OutOfOrderSequenceException;
+        }
+    }
+
+    @Test
+    public void testCorrectHandlingOfOutOfOrderResponses() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        ClientRequest firstClientRequest = client.requests().peek();
+        ClientRequest secondClientRequest = (ClientRequest) client.requests().toArray()[1];
+
+        client.respondToRequest(secondClientRequest, produceResponse(tp0, -1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1));
+
+        sender.run(time.milliseconds()); // receive response 1
+        Deque<ProducerBatch> queuedBatches = accumulator.batches().get(tp0);
+
+        // Make sure that we are queueing the second batch first.
+        assertEquals(1, queuedBatches.size());
+        assertEquals(1, queuedBatches.peekFirst().baseSequence());
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1));
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        // Make sure we requeued both batches in the correct order.
+        assertEquals(2, queuedBatches.size());
+        assertEquals(0, queuedBatches.peekFirst().baseSequence());
+        assertEquals(1, queuedBatches.peekLast().baseSequence());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+
+        sender.run(time.milliseconds()); // send request 0
+        assertEquals(1, client.inFlightRequestCount());
+        sender.run(time.milliseconds()); // don't do anything, only one inflight allowed once we are retrying.
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Make sure that the requests are sent in order, even though the previous responses were not in order.
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+        sender.run(time.milliseconds());  // receive response 0
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+
+        sender.run(time.milliseconds()); // send request 1
+        assertEquals(1, client.inFlightRequestCount());
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
+        sender.run(time.milliseconds());  // receive response 1
+
+        assertFalse(client.hasInFlightRequests());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+    }
+
+    @Test
+    public void testCorrectHandlingOfOutOfOrderResponsesWhenSecondSucceeds() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        ClientRequest firstClientRequest = client.requests().peek();
+        ClientRequest secondClientRequest = (ClientRequest) client.requests().toArray()[1];
+
+        client.respondToRequest(secondClientRequest, produceResponse(tp0, 1, Errors.NONE, 1));
+
+        sender.run(time.milliseconds()); // receive response 1
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+        assertFalse(request1.isDone());
+        Deque<ProducerBatch> queuedBatches = accumulator.batches().get(tp0);
+
+        assertEquals(0, queuedBatches.size());
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+
+        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.REQUEST_TIMED_OUT, -1));
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        // Make sure we requeued both batches in the correct order.
+        assertEquals(1, queuedBatches.size());
+        assertEquals(0, queuedBatches.peekFirst().baseSequence());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        sender.run(time.milliseconds()); // resend request 0
+        assertEquals(1, client.inFlightRequestCount());
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+
+        // Make sure we handle the out of order successful responses correctly.
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+        sender.run(time.milliseconds());  // receive response 0
+        assertEquals(0, queuedBatches.size());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        assertFalse(client.hasInFlightRequests());
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+    }
+
+    @Test
+    public void testExpiryOfUnsentBatchesShouldNotCauseUnresolvedSequences() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Node node = this.cluster.nodes().get(0);
+        time.sleep(10000L);
+        client.disconnect(node.idString());
+        client.blackout(node, 10);
+
+        sender.run(time.milliseconds());
+
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised timeout exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+        assertFalse(transactionManager.hasUnresolvedSequence(tp0));
+    }
+
+    @Test
+    public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+
+        assertEquals(2, client.inFlightRequestCount());
+
+        sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1);
+        sender.run(time.milliseconds());  // receive first response
+
+        Node node = this.cluster.nodes().get(0);
+        time.sleep(10000L);
+        client.disconnect(node.idString());
+        client.blackout(node, 10);
+
+        sender.run(time.milliseconds()); // now expire the first batch.
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised timeout exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+        assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+        // let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
+        Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+
+        time.sleep(20);
+
+        assertFalse(request2.isDone());
+
+        sender.run(time.milliseconds());  // send second request
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1);
+        sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+
+        assertEquals(1, batches.size());
+        assertFalse(batches.peekFirst().hasSequence());
+        assertFalse(client.hasInFlightRequests());
+        assertEquals(2L, transactionManager.sequenceNumber(tp0).longValue());
+        assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+
+        sender.run(time.milliseconds());  // clear the unresolved state, send the pending request.
+        assertFalse(transactionManager.hasUnresolvedSequence(tp0));
+        assertTrue(transactionManager.hasProducerId());
+        assertEquals(0, batches.size());
+        assertEquals(1, client.inFlightRequestCount());
+        assertFalse(request3.isDone());
+    }
+
+    @Test
+    public void testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+
+        assertEquals(2, client.inFlightRequestCount());
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+        sender.run(time.milliseconds());  // receive first response
+
+        Node node = this.cluster.nodes().get(0);
+        time.sleep(10000L);
+        client.disconnect(node.idString());
+        client.blackout(node, 10);
+
+        sender.run(time.milliseconds()); // now expire the first batch.
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised timeout exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+        assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+        // let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
+        Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+
+        time.sleep(20);
+
+        assertFalse(request2.isDone());
+
+        sender.run(time.milliseconds());  // send second request
+        sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
+        sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
+        assertTrue(request2.isDone());
+
+        try {
+            request2.get();
+            fail("should have failed with an exception");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
+        }
+
+        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+
+        // The second request should not be requeued.
+        assertEquals(1, batches.size());
+        assertFalse(batches.peekFirst().hasSequence());
+        assertFalse(client.hasInFlightRequests());
+
+        // The producer state should be reset.
+        assertFalse(transactionManager.hasProducerId());
+        assertFalse(transactionManager.hasUnresolvedSequence(tp0));
+    }
+
+    @Test
+    public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+        sender.run(time.milliseconds());  // receive response
+
+        assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
+
+        Node node = this.cluster.nodes().get(0);
+        time.sleep(10000L);
+        client.disconnect(node.idString());
+        client.blackout(node, 10);
+
+        sender.run(time.milliseconds()); // now expire the batch.
+
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised timeout exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+        assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+        assertFalse(client.hasInFlightRequests());
+        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+        assertEquals(0, batches.size());
+        assertTrue(transactionManager.hasProducerId(producerId));
+        // We should now clear the old producerId and get a new one in a single run loop.
+        prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId(producerId + 1));
+    }
+
+    @Test
+    public void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        ClientRequest firstClientRequest = client.requests().peek();
+        ClientRequest secondClientRequest = (ClientRequest) client.requests().toArray()[1];
+
+        client.respondToRequest(secondClientRequest, produceResponse(tp0, 1, Errors.NONE, -1));
+
+        sender.run(time.milliseconds()); // receive response 1
+
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+
+        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.DUPLICATE_SEQUENCE_NUMBER, -1));
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        // Make sure that the last ack'd sequence doesn't change.
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(client.hasInFlightRequests());
+    }
+
+    void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) {
+        client.respond(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                ProduceRequest produceRequest = (ProduceRequest) body;
+                assertTrue(produceRequest.isIdempotent());
+
+                MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp0);
+                Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
+                RecordBatch firstBatch = batchIterator.next();
+                assertFalse(batchIterator.hasNext());
+                assertEquals(expectedSequence, firstBatch.baseSequence());
+
+                return true;
+            }
+        }, produceResponse(tp, responseOffset, responseError, 0));
+    }
+
     @Test
     public void testClusterAuthorizationExceptionInProduceRequest() throws Exception {
         final long producerId = 343434L;
@@ -604,7 +1258,8 @@ public class SenderTest {
 
         sender.run(time.milliseconds());  // receive response
         assertTrue(responseFuture.isDone());
-        assertEquals((long) transactionManager.sequenceNumber(tp0), 1L);
+        assertEquals(0L, (long) transactionManager.lastAckedSequence(tp0));
+        assertEquals(1L, (long) transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -632,6 +1287,7 @@ public class SenderTest {
         assertEquals(0, client.inFlightRequestCount());
         assertFalse("Client ready status should be false", client.isReady(node, 0L));
 
+        transactionManager.resetProducerId();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
         sender.run(time.milliseconds()); // receive error
         sender.run(time.milliseconds()); // reconnect
@@ -642,7 +1298,7 @@ public class SenderTest {
         assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
 
         assertTrue(responseFuture.isDone());
-        assertEquals((long) transactionManager.sequenceNumber(tp0), 0L);
+        assertEquals(0, (long) transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -720,7 +1376,8 @@ public class SenderTest {
                     accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
-            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue());
+
+            assertEquals("The next sequence should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             String id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.valueOf(id), "localhost", 0);
@@ -731,11 +1388,12 @@ public class SenderTest {
             responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
             client.respond(new ProduceResponse(responseMap));
             sender.run(time.milliseconds()); // split and reenqueue
+            assertEquals("The next sequence should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             // The compression ratio should have been improved once.
             assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
                     CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01);
-            sender.run(time.milliseconds()); // send produce request
-            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue());
+            sender.run(time.milliseconds()); // send the first produce request
+            assertEquals("The next sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             assertFalse("The future shouldn't have been done.", f1.isDone());
             assertFalse("The future shouldn't have been done.", f2.isDone());
             id = client.requests().peek().destination();
@@ -750,10 +1408,11 @@ public class SenderTest {
 
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f1.isDone());
-            assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp).longValue());
+            assertEquals("The next sequence number should still be 2", 2, txnManager.sequenceNumber(tp).longValue());
+            assertEquals("The last ack'd sequence number should be 0", 0, txnManager.lastAckedSequence(tp));
             assertFalse("The future shouldn't have been done.", f2.isDone());
             assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
-            sender.run(time.milliseconds()); // send produce request
+            sender.run(time.milliseconds()); // send the seconcd produce request
             id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             node = new Node(Integer.valueOf(id), "localhost", 0);
@@ -766,7 +1425,8 @@ public class SenderTest {
 
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f2.isDone());
-            assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
+            assertEquals("The next sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
+            assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp));
             assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
             assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
 
@@ -828,9 +1488,11 @@ public class SenderTest {
         this.metrics = new Metrics(metricConfig, time);
         this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
                 apiVersions, transactionManager);
+
         this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet());
-        this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
-                MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                Integer.MAX_VALUE, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 53bba1c..28f9c82 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1171,6 +1171,7 @@ public class TransactionManagerTest {
         assertFutureFailed(unauthorizedTopicProduceFuture);
         assertTrue(authorizedTopicProduceFuture.isDone());
         assertNotNull(authorizedTopicProduceFuture.get());
+        assertTrue(authorizedTopicProduceFuture.isDone());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
         transactionManager.beginAbort();
@@ -1481,14 +1482,14 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // until the produce future returns, we will not send EndTxn
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // now the produce response returns
@@ -1497,14 +1498,14 @@ public class TransactionManagerTest {
         assertTrue(responseFuture.isDone());
         assertFalse(accumulator.hasUndrained());
         assertFalse(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we send EndTxn
         sender.run(time.milliseconds());
-        assertTrue(transactionManager.hasInFlightRequest());
+        assertTrue(transactionManager.hasInFlightTransactionalRequest());
         sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
         sender.run(time.milliseconds());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertTrue(transactionManager.isReady());
     }
 
@@ -1529,21 +1530,21 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we begin the commit with the produce request still pending
         transactionManager.beginCommit();
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // until the produce future returns, we will not send EndTxn
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // now the produce response returns
@@ -1552,14 +1553,14 @@ public class TransactionManagerTest {
         assertTrue(responseFuture.isDone());
         assertFalse(accumulator.hasUndrained());
         assertFalse(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we send EndTxn
         sender.run(time.milliseconds());
-        assertTrue(transactionManager.hasInFlightRequest());
+        assertTrue(transactionManager.hasInFlightTransactionalRequest());
         sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
         sender.run(time.milliseconds());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertTrue(transactionManager.isReady());
     }
 
@@ -1960,17 +1961,17 @@ public class TransactionManagerTest {
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
-        sender.run(time.milliseconds()); // AddPartitions
+        sender.run(time.milliseconds()); // Add partitions
         sender.run(time.milliseconds()); // Produce
 
         assertFalse(responseFuture.isDone());
 
         transactionManager.transitionToAbortableError(new KafkaException());
         prepareProduceResponse(Errors.NONE, pid, epoch);
-
         sender.run(time.milliseconds());
+
         assertTrue(responseFuture.isDone());
-        assertNotNull(responseFuture.get());
+        assertNotNull(responseFuture.get()); // should throw the exception which caused the transaction to be aborted.
     }
 
     @Test
@@ -2151,6 +2152,65 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.transactionContainsPartition(tp0));
     }
 
+    @Test
+    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException, ExecutionException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+
+        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
+        sender.run(time.milliseconds());  // send addPartitions.
+        // Check that only addPartitions was sent.
+        assertTrue(transactionManager.transactionContainsPartition(tp0));
+        assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
+
+        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
+        sender.run(time.milliseconds());  // send the produce request.
+
+        assertFalse(responseFuture.isDone());
+
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        // Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
+        time.sleep(10000);
+        // Disconnect the target node for the pending produce request. This will ensure that sender will try to
+        // expire the batch.
+        Node clusterNode = this.cluster.nodes().get(0);
+        client.disconnect(clusterNode.idString());
+        client.blackout(clusterNode, 100);
+
+        sender.run(time.milliseconds());  // We should try to flush the produce, but expire it instead without sending anything.
+        assertTrue(responseFuture.isDone());
+
+        try {
+            // make sure the produce was expired.
+            responseFuture.get();
+            fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof  TimeoutException);
+        }
+        sender.run(time.milliseconds());  // Transition to fatal error since we have unresolved batches.
+        sender.run(time.milliseconds());  // Fail the queued transactional requests
+
+        assertTrue(commitResult.isCompleted());
+        assertFalse(commitResult.isSuccessful());  // the commit should have been dropped.
+
+        assertTrue(transactionManager.hasFatalError());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
         final long pid = 1L;
         final short epoch = 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d3de24d..d98f443 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -514,8 +514,10 @@ class Log(@volatile var dir: File,
     completedTxns.foreach(producerStateManager.completeTxn)
   }
 
-  private[log] def activeProducers: Map[Long, ProducerIdEntry] = lock synchronized {
-    producerStateManager.activeProducers
+  private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized {
+    producerStateManager.activeProducers.map { case (producerId, producerIdEntry) =>
+      (producerId, producerIdEntry.lastSeq)
+    }
   }
 
   /**
@@ -765,17 +767,18 @@ class Log(@volatile var dir: File,
   }
 
   private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
-  (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[ProducerIdEntry]) = {
+  (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
     val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
     val completedTxns = ListBuffer.empty[CompletedTxn]
     for (batch <- records.batches.asScala if batch.hasProducerId) {
       val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
 
-      // if this is a client produce request, there will be only one batch. If that batch matches
-      // the last appended entry for that producer, then this request is a duplicate and we return
-      // the last appended entry to the client.
-      if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch)))
-        return (updatedProducers, completedTxns.toList, maybeLastEntry)
+      // if this is a client produce request, there will be upto 5 batches which could have been duplicated.
+      // If we find a duplicate, we return the metadata of the appended batch to the client.
+      if (isFromClient)
+        maybeLastEntry.flatMap(_.duplicateOf(batch)).foreach { duplicate =>
+          return (updatedProducers, completedTxns.toList, Some(duplicate))
+        }
 
       val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false)
       maybeCompletedTxn.foreach(completedTxns += _)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4f53b41..61dd0fc 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -450,7 +450,7 @@ private[log] class Cleaner(val id: Int,
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
           .format(startOffset, log.name, new Date(oldSegmentOpt.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
         cleanInto(log.topicPartition, oldSegmentOpt, cleaned, map, retainDeletes, log.config.maxMessageSize, transactionMetadata,
-          log.activeProducers, stats)
+          log.activeProducersWithLastSequence, stats)
 
         currentSegmentOpt = nextSegmentOpt
       }
@@ -503,7 +503,7 @@ private[log] class Cleaner(val id: Int,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
-                             activeProducers: Map[Long, ProducerIdEntry],
+                             activeProducers: Map[Long, Int],
                              stats: CleanerStats) {
     val logCleanerFilter = new RecordFilter {
       var discardBatchRecords: Boolean = _
@@ -515,7 +515,7 @@ private[log] class Cleaner(val id: Int,
 
         // check if the batch contains the last sequence number for the producer. if so, we cannot
         // remove the batch just yet or the producer may see an out of sequence error.
-        if (batch.hasProducerId && activeProducers.get(batch.producerId).exists(_.lastSeq == batch.lastSequence))
+        if (batch.hasProducerId && activeProducers.get(batch.producerId).contains(batch.lastSequence))
           BatchRetention.RETAIN_EMPTY
         else if (discardBatchRecords)
           BatchRetention.DELETE

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index fc2e340..4c3d1a1 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -17,7 +17,7 @@
 package kafka.log
 
 import java.io._
-import java.nio.ByteBuffer
+import java.nio.{BufferUnderflowException, ByteBuffer}
 import java.nio.file.Files
 
 import kafka.common.KafkaException
@@ -48,33 +48,82 @@ private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffset
 }
 
 private[log] object ProducerIdEntry {
-  val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-    -1, 0, RecordBatch.NO_TIMESTAMP, -1, None)
+  private[log] val NumBatchesToRetain = 5
+  def empty(producerId: Long) = new ProducerIdEntry(producerId, mutable.Queue[BatchMetadata](), RecordBatch.NO_PRODUCER_EPOCH, -1, None)
 }
 
-private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, lastSeq: Int, lastOffset: Long,
-                                        offsetDelta: Int, timestamp: Long, coordinatorEpoch: Int,
-                                        currentTxnFirstOffset: Option[Long]) {
-  def firstSeq: Int = lastSeq - offsetDelta
-  def firstOffset: Long = lastOffset - offsetDelta
+private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) {
+  def firstSeq = lastSeq - offsetDelta
+  def firstOffset = lastOffset - offsetDelta
 
-  def isDuplicate(batch: RecordBatch): Boolean = {
-    batch.producerEpoch == producerEpoch &&
-      batch.baseSequence == firstSeq &&
-      batch.lastSequence == lastSeq
+  override def toString: String = {
+    "BatchMetadata(" +
+      s"firstSeq=$firstSeq, " +
+      s"lastSeq=$lastSeq, " +
+      s"firstOffset=$firstOffset, " +
+      s"lastOffset=$lastOffset, " +
+      s"timestamp=$timestamp)"
+  }
+}
+
+// the batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the
+// batch with the highest sequence is at the tail of the queue. We will retain at most ProducerIdEntry.NumBatchesToRetain
+// elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch.
+private[log] class ProducerIdEntry(val producerId: Long, val batchMetadata: mutable.Queue[BatchMetadata],
+                                   var producerEpoch: Short, var coordinatorEpoch: Int,
+                                   var currentTxnFirstOffset: Option[Long]) {
+
+  def firstSeq: Int = if (batchMetadata.isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.front.firstSeq
+  def firstOffset: Long = if (batchMetadata.isEmpty) -1L else batchMetadata.front.firstOffset
+
+  def lastSeq: Int = if (batchMetadata.isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.last.lastSeq
+  def lastDataOffset: Long = if (batchMetadata.isEmpty) -1L else batchMetadata.last.lastOffset
+  def lastTimestamp = if (batchMetadata.isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp
+  def lastOffsetDelta : Int = if (batchMetadata.isEmpty) 0 else batchMetadata.last.offsetDelta
+
+  def addBatchMetadata(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) = {
+    maybeUpdateEpoch(producerEpoch)
+
+    if (batchMetadata.size == ProducerIdEntry.NumBatchesToRetain)
+      batchMetadata.dequeue()
+
+    batchMetadata.enqueue(BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp))
+  }
+
+  def maybeUpdateEpoch(producerEpoch: Short): Boolean = {
+    if (this.producerEpoch != producerEpoch) {
+      batchMetadata.clear()
+      this.producerEpoch = producerEpoch
+      true
+    } else {
+      false
+    }
+  }
+
+  def removeBatchesOlderThan(offset: Long) = batchMetadata.dropWhile(_.lastOffset < offset)
+
+  def duplicateOf(batch: RecordBatch): Option[BatchMetadata] = {
+    if (batch.producerEpoch() != producerEpoch)
+       None
+    else
+      batchWithSequenceRange(batch.baseSequence(), batch.lastSequence())
+  }
+
+  // Return the batch metadata of the cached batch having the exact sequence range, if any.
+  def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = {
+    val duplicate = batchMetadata.filter { case(metadata) =>
+      firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
+    }
+    duplicate.headOption
   }
 
   override def toString: String = {
     "ProducerIdEntry(" +
       s"producerId=$producerId, " +
       s"producerEpoch=$producerEpoch, " +
-      s"firstSequence=$firstSeq, " +
-      s"lastSequence=$lastSeq, " +
-      s"firstOffset=$firstOffset, " +
-      s"lastOffset=$lastOffset, " +
-      s"timestamp=$timestamp, " +
       s"currentTxnFirstOffset=$currentTxnFirstOffset, " +
-      s"coordinatorEpoch=$coordinatorEpoch)"
+      s"coordinatorEpoch=$coordinatorEpoch, " +
+      s"batchMetadata=$batchMetadata"
   }
 }
 
@@ -85,8 +134,10 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short,
  * as the incoming records are validated.
  *
  * @param producerId The id of the producer appending to the log
- * @param initialEntry The last entry associated with the producer id. Validation of the first append will be
- *                     based off of this entry initially
+ * @param currentEntry  The current entry associated with the producer id which contains metadata for a fixed number of
+ *                      the most recent appends made by the producer. Validation of the first incoming append will
+ *                      be made against the lastest append in the current entry. New appends will replace older appends
+ *                      in the current entry so that the space overhead is constant.
  * @param validateSequenceNumbers Whether or not sequence numbers should be validated. The only current use
  *                                of this is the consumer offsets topic which uses producer ids from incoming
  *                                TxnOffsetCommit, but has no sequence number to validate and does not depend
@@ -98,48 +149,46 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short,
  *                       retention enforcement.
  */
 private[log] class ProducerAppendInfo(val producerId: Long,
-                                      initialEntry: ProducerIdEntry,
+                                      currentEntry: ProducerIdEntry,
                                       validateSequenceNumbers: Boolean,
                                       loadingFromLog: Boolean) {
-  private var producerEpoch = initialEntry.producerEpoch
-  private var firstSeq = initialEntry.firstSeq
-  private var lastSeq = initialEntry.lastSeq
-  private var lastOffset = initialEntry.lastOffset
-  private var maxTimestamp = initialEntry.timestamp
-  private var currentTxnFirstOffset = initialEntry.currentTxnFirstOffset
-  private var coordinatorEpoch = initialEntry.coordinatorEpoch
+
   private val transactions = ListBuffer.empty[TxnMetadata]
 
   private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = {
     if (isFenced(producerEpoch)) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
-        s"with a newer epoch. $producerEpoch (request epoch), ${this.producerEpoch} (server epoch)")
+        s"with a newer epoch. $producerEpoch (request epoch), ${currentEntry.producerEpoch} (server epoch)")
     } else if (validateSequenceNumbers) {
-      if (producerEpoch != this.producerEpoch) {
+      if (producerEpoch != currentEntry.producerEpoch) {
         if (firstSeq != 0)
           throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
             s"(request epoch), $firstSeq (seq. number)")
-      } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
+      } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
         // the epoch was bumped by a control record, so we expect the sequence number to be reset
         throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " +
           s"(incoming seq. number), but expected 0")
-      } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
-        throw new DuplicateSequenceNumberException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
-          s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, lastEntry.lastSeq): " +
-          s"(${this.firstSeq}, ${this.lastSeq}).")
+      } else if (isDuplicate(firstSeq, lastSeq)) {
+        throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
+          s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).")
       } else if (!inSequence(firstSeq, lastSeq)) {
         throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " +
-          s"(incoming seq. number), ${this.lastSeq} (current end sequence number)")
+          s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)")
       }
     }
   }
 
+  private def isDuplicate(firstSeq: Int, lastSeq: Int): Boolean = {
+    ((lastSeq != 0 && currentEntry.firstSeq != Int.MaxValue && lastSeq < currentEntry.firstSeq)
+      || currentEntry.batchWithSequenceRange(firstSeq, lastSeq).isDefined)
+  }
+
   private def inSequence(firstSeq: Int, lastSeq: Int): Boolean = {
-    firstSeq == this.lastSeq + 1L || (firstSeq == 0 && this.lastSeq == Int.MaxValue)
+    firstSeq == currentEntry.lastSeq + 1L || (firstSeq == 0 && currentEntry.lastSeq == Int.MaxValue)
   }
 
   private def isFenced(producerEpoch: Short): Boolean = {
-    producerEpoch < this.producerEpoch
+    producerEpoch < currentEntry.producerEpoch
   }
 
   def append(batch: RecordBatch): Option[CompletedTxn] = {
@@ -166,18 +215,14 @@ private[log] class ProducerAppendInfo(val producerId: Long,
       // will generally have removed the beginning entries from each producer id
       validateAppend(epoch, firstSeq, lastSeq)
 
-    this.producerEpoch = epoch
-    this.firstSeq = firstSeq
-    this.lastSeq = lastSeq
-    this.maxTimestamp = lastTimestamp
-    this.lastOffset = lastOffset
+    currentEntry.addBatchMetadata(epoch, lastSeq, lastOffset, lastSeq - firstSeq, lastTimestamp)
 
-    if (currentTxnFirstOffset.isDefined && !isTransactional)
+    if (currentEntry.currentTxnFirstOffset.isDefined && !isTransactional)
       throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId")
 
-    if (isTransactional && currentTxnFirstOffset.isEmpty) {
+    if (isTransactional && currentEntry.currentTxnFirstOffset.isEmpty) {
       val firstOffset = lastOffset - (lastSeq - firstSeq)
-      currentTxnFirstOffset = Some(firstOffset)
+      currentEntry.currentTxnFirstOffset = Some(firstOffset)
       transactions += new TxnMetadata(producerId, firstOffset)
     }
   }
@@ -187,44 +232,27 @@ private[log] class ProducerAppendInfo(val producerId: Long,
                          offset: Long,
                          timestamp: Long): CompletedTxn = {
     if (isFenced(producerEpoch))
-      throw new ProducerFencedException(s"Invalid producer epoch: $producerEpoch (zombie): ${this.producerEpoch} (current)")
+      throw new ProducerFencedException(s"Invalid producer epoch: $producerEpoch (zombie): ${currentEntry.producerEpoch} (current)")
 
-    if (this.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
+    if (currentEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
       throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch: ${endTxnMarker.coordinatorEpoch} " +
-        s"(zombie), $coordinatorEpoch (current)")
+        s"(zombie), ${currentEntry.coordinatorEpoch} (current)")
 
-    if (producerEpoch != this.producerEpoch) {
-      // it is possible that this control record is the first record seen from a new epoch (the producer
-      // may fail before sending to the partition or the request itself could fail for some reason). In this
-      // case, we bump the epoch and reset the sequence numbers
-      this.producerEpoch = producerEpoch
-      this.firstSeq = RecordBatch.NO_SEQUENCE
-      this.lastSeq = RecordBatch.NO_SEQUENCE
-    } else {
-      // the control record is the last append to the log, so the last offset will be updated to point to it.
-      // However, the sequence numbers still point to the previous batch, so the duplicate check would no longer
-      // be correct: it would return the wrong offset. To fix this, we treat the control record as a batch
-      // of size 1 which uses the last appended sequence number.
-      this.firstSeq = this.lastSeq
-    }
+    currentEntry.maybeUpdateEpoch(producerEpoch)
 
-    val firstOffset = currentTxnFirstOffset match {
+    val firstOffset = currentEntry.currentTxnFirstOffset match {
       case Some(txnFirstOffset) => txnFirstOffset
       case None =>
         transactions += new TxnMetadata(producerId, offset)
         offset
     }
 
-    this.lastOffset = offset
-    this.currentTxnFirstOffset = None
-    this.maxTimestamp = timestamp
-    this.coordinatorEpoch = endTxnMarker.coordinatorEpoch
+    currentEntry.currentTxnFirstOffset = None
+    currentEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
     CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
   }
 
-  def lastEntry: ProducerIdEntry =
-    ProducerIdEntry(producerId, producerEpoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp,
-      coordinatorEpoch, currentTxnFirstOffset)
+  def latestEntry: ProducerIdEntry = currentEntry
 
   def startedTransactions: List[TxnMetadata] = transactions.toList
 
@@ -243,11 +271,11 @@ private[log] class ProducerAppendInfo(val producerId: Long,
   override def toString: String = {
     "ProducerAppendInfo(" +
       s"producerId=$producerId, " +
-      s"producerEpoch=$producerEpoch, " +
-      s"firstSequence=$firstSeq, " +
-      s"lastSequence=$lastSeq, " +
-      s"currentTxnFirstOffset=$currentTxnFirstOffset, " +
-      s"coordinatorEpoch=$coordinatorEpoch, " +
+      s"producerEpoch=${currentEntry.producerEpoch}, " +
+      s"firstSequence=${currentEntry.firstSeq}, " +
+      s"lastSequence=${currentEntry.lastSeq}, " +
+      s"currentTxnFirstOffset=${currentEntry.currentTxnFirstOffset}, " +
+      s"coordinatorEpoch=${currentEntry.coordinatorEpoch}, " +
       s"startedTransactions=$transactions)"
   }
 }
@@ -309,7 +337,7 @@ object ProducerStateManager {
         val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
         val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
         val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
-        val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp,
+        val newEntry = new ProducerIdEntry(producerId, mutable.Queue[BatchMetadata](BatchMetadata(seq, offset, offsetDelta, timestamp)), producerEpoch,
           coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
         newEntry
       }
@@ -329,9 +357,9 @@ object ProducerStateManager {
         producerEntryStruct.set(ProducerIdField, producerId)
           .set(ProducerEpochField, entry.producerEpoch)
           .set(LastSequenceField, entry.lastSeq)
-          .set(LastOffsetField, entry.lastOffset)
-          .set(OffsetDeltaField, entry.offsetDelta)
-          .set(TimestampField, entry.timestamp)
+          .set(LastOffsetField, entry.lastDataOffset)
+          .set(OffsetDeltaField, entry.lastOffsetDelta)
+          .set(TimestampField, entry.lastTimestamp)
           .set(CoordinatorEpochField, entry.coordinatorEpoch)
           .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))
         producerEntryStruct
@@ -472,7 +500,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   private def isProducerExpired(currentTimeMs: Long, producerIdEntry: ProducerIdEntry): Boolean =
-    producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.timestamp >= maxProducerIdExpirationMs
+    producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.lastTimestamp >= maxProducerIdExpirationMs
 
   /**
    * Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
@@ -508,7 +536,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo =
-    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.Empty), validateSequenceNumbers,
+    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.empty(producerId)), validateSequenceNumbers,
       loadingFromLog)
 
   /**
@@ -520,7 +548,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
     trace(s"Updated producer ${appendInfo.producerId} state to $appendInfo")
 
-    val entry = appendInfo.lastEntry
+    val entry = appendInfo.latestEntry
     producers.put(appendInfo.producerId, entry)
     appendInfo.startedTransactions.foreach { txn =>
       ongoingTxns.put(txn.firstOffset.messageOffset, txn)
@@ -562,7 +590,8 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFilename(file.getName))
 
   private def isProducerRetained(producerIdEntry: ProducerIdEntry, logStartOffset: Long): Boolean = {
-    producerIdEntry.lastOffset >= logStartOffset
+    producerIdEntry.removeBatchesOlderThan(logStartOffset)
+    producerIdEntry.lastDataOffset >= logStartOffset
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 025617f..c4f7ce0 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -115,7 +115,7 @@ object DumpLogSegments {
         case Log.TimeIndexFileSuffix =>
           dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
         case Log.PidSnapshotFileSuffix =>
-          dumpPidSnapshot(file)
+          dumpProducerIdSnapshot(file)
         case Log.TxnIndexFileSuffix =>
           dumpTxnIndex(file)
         case _ =>
@@ -152,12 +152,12 @@ object DumpLogSegments {
     }
   }
 
-  private def dumpPidSnapshot(file: File): Unit = {
+  private def dumpProducerIdSnapshot(file: File): Unit = {
     try {
-      ProducerStateManager.readSnapshot(file).foreach { entry=>
-        println(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} lastSequence: ${entry.lastSeq} " +
-          s"lastOffset: ${entry.lastOffset} offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp} " +
-          s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset}")
+      ProducerStateManager.readSnapshot(file).foreach { entry =>
+        println(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} " +
+          s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset} " +
+          s"cachedMetadata: ${entry.batchMetadata}")
       }
     } catch {
       case e: CorruptSnapshotException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index a11972e..1bde7b1 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -122,7 +122,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
 
     val producerConfig = new Properties()
     producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
     val producerConfigWithCompression = new Properties()
     producerConfigWithCompression ++= producerConfig
     producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 810f481..1a85d34 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -24,12 +24,15 @@ import kafka.server.KafkaConfig
 import kafka.utils.{ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Test
 
 import scala.collection.JavaConverters._
 import org.junit.Assert._
 
+import scala.collection.mutable
+
 
 class TransactionsBounceTest extends KafkaServerTestHarness {
   private val producerBufferSize =  65536
@@ -76,12 +79,12 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
     // basic idea is to seed a topic with 10000 records, and copy it transactionally while bouncing brokers
     // constantly through the period.
     val consumerGroup = "myGroup"
-    val numInputRecords = 5000
+    val numInputRecords = 10000
     createTopics()
 
     TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers)
     val consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
-    val producer = TestUtils.createTransactionalProducer("test-txn", servers)
+    val producer = TestUtils.createTransactionalProducer("test-txn", servers, 512)
 
     producer.initTransactions()
 
@@ -125,9 +128,20 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
     scheduler.shutdown()
 
     val verifyingConsumer = createConsumerAndSubscribeToTopics("randomGroup", List(outputTopic), readCommitted = true)
-    val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { record =>
-      TestUtils.assertCommittedAndGetValue(record).toInt
+    val recordsByPartition = new mutable.HashMap[TopicPartition, mutable.ListBuffer[Int]]()
+    TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).foreach { record =>
+      val value = TestUtils.assertCommittedAndGetValue(record).toInt
+      val topicPartition = new TopicPartition(record.topic(), record.partition())
+      recordsByPartition.getOrElseUpdate(topicPartition, new mutable.ListBuffer[Int])
+        .append(value)
+    }
+
+    val outputRecords = new mutable.ListBuffer[Int]()
+    recordsByPartition.values.foreach { case (partitionValues) =>
+      assertEquals("Out of order messages detected", partitionValues, partitionValues.sorted)
+      outputRecords.appendAll(partitionValues)
     }
+
     val recordSet = outputRecords.toSet
     assertEquals(numInputRecords, recordSet.size)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 2a07532..61a8492 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -17,6 +17,7 @@
  package kafka.log
 
 import java.io.File
+import java.util.concurrent.ConcurrentLinkedDeque
 
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.checkEquals
@@ -317,7 +318,7 @@ class LogSegmentTest {
 
     // recover again, but this time assuming the transaction from pid2 began on a previous segment
     stateManager = new ProducerStateManager(topicPartition, logDir)
-    stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L)))
+    stateManager.loadProducerEntry(new ProducerIdEntry(pid2, mutable.Queue[BatchMetadata](BatchMetadata(10, 90L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L)))
     segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index ad64e39..2ae62c5 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -418,14 +418,11 @@ class LogTest {
 
     log.truncateTo(baseOffset + 4)
 
-    val activeProducers = log.activeProducers
+    val activeProducers = log.activeProducersWithLastSequence
     assertTrue(activeProducers.contains(pid))
 
-    val entry = activeProducers(pid)
-    assertEquals(0, entry.firstSeq)
-    assertEquals(baseOffset, entry.firstOffset)
-    assertEquals(3, entry.lastSeq)
-    assertEquals(baseOffset + 3, entry.lastOffset)
+    val lastSeq = activeProducers(pid)
+    assertEquals(3, lastSeq)
   }
 
   @Test
@@ -462,14 +459,11 @@ class LogTest {
 
     log.truncateTo(baseOffset + 2)
 
-    val activeProducers = log.activeProducers
+    val activeProducers = log.activeProducersWithLastSequence
     assertTrue(activeProducers.contains(pid))
 
-    val entry = activeProducers(pid)
-    assertEquals(0, entry.firstSeq)
-    assertEquals(baseOffset, entry.firstOffset)
-    assertEquals(1, entry.lastSeq)
-    assertEquals(baseOffset + 1, entry.lastOffset)
+    val lastSeq = activeProducers(pid)
+    assertEquals(1, lastSeq)
   }
 
   @Test
@@ -498,14 +492,11 @@ class LogTest {
     val filteredRecords = MemoryRecords.readableRecords(filtered)
 
     log.appendAsFollower(filteredRecords)
-    val activeProducers = log.activeProducers
+    val activeProducers = log.activeProducersWithLastSequence
     assertTrue(activeProducers.contains(pid))
 
-    val entry = activeProducers(pid)
-    assertEquals(0, entry.firstSeq)
-    assertEquals(baseOffset, entry.firstOffset)
-    assertEquals(3, entry.lastSeq)
-    assertEquals(baseOffset + 3, entry.lastOffset)
+    val lastSeq = activeProducers(pid)
+    assertEquals(3, lastSeq)
   }
 
   @Test
@@ -547,13 +538,13 @@ class LogTest {
     }
 
     log.truncateTo(1L)
-    assertEquals(1, log.activeProducers.size)
+    assertEquals(1, log.activeProducersWithLastSequence.size)
 
-    val pidEntryOpt = log.activeProducers.get(pid)
-    assertTrue(pidEntryOpt.isDefined)
+    val lastSeqOpt = log.activeProducersWithLastSequence.get(pid)
+    assertTrue(lastSeqOpt.isDefined)
 
-    val pidEntry = pidEntryOpt.get
-    assertEquals(0, pidEntry.lastSeq)
+    val lastSeq = lastSeqOpt.get
+    assertEquals(0, lastSeq)
   }
 
   @Test
@@ -568,21 +559,21 @@ class LogTest {
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
-    assertEquals(2, log.activeProducers.size)
+    assertEquals(2, log.activeProducersWithLastSequence.size)
 
     log.maybeIncrementLogStartOffset(1L)
 
-    assertEquals(1, log.activeProducers.size)
-    val retainedEntryOpt = log.activeProducers.get(pid2)
-    assertTrue(retainedEntryOpt.isDefined)
-    assertEquals(0, retainedEntryOpt.get.lastSeq)
+    assertEquals(1, log.activeProducersWithLastSequence.size)
+    val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertTrue(retainedLastSeqOpt.isDefined)
+    assertEquals(0, retainedLastSeqOpt.get)
 
     log.close()
 
     val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
-    assertEquals(1, reloadedLog.activeProducers.size)
-    val reloadedEntryOpt = log.activeProducers.get(pid2)
-    assertEquals(retainedEntryOpt, reloadedEntryOpt)
+    assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
+    val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
 
   @Test
@@ -600,24 +591,24 @@ class LogTest {
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
 
     assertEquals(2, log.logSegments.size)
-    assertEquals(2, log.activeProducers.size)
+    assertEquals(2, log.activeProducersWithLastSequence.size)
 
     log.maybeIncrementLogStartOffset(1L)
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals(1, log.logSegments.size)
-    assertEquals(1, log.activeProducers.size)
-    val retainedEntryOpt = log.activeProducers.get(pid2)
-    assertTrue(retainedEntryOpt.isDefined)
-    assertEquals(0, retainedEntryOpt.get.lastSeq)
+    assertEquals(1, log.activeProducersWithLastSequence.size)
+    val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertTrue(retainedLastSeqOpt.isDefined)
+    assertEquals(0, retainedLastSeqOpt.get)
 
     log.close()
 
     val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
-    assertEquals(1, reloadedLog.activeProducers.size)
-    val reloadedEntryOpt = log.activeProducers.get(pid2)
-    assertEquals(retainedEntryOpt, reloadedEntryOpt)
+    assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
+    val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertEquals(retainedLastSeqOpt, reloadedEntryOpt)
   }
 
   @Test
@@ -659,13 +650,13 @@ class LogTest {
     log.takeProducerSnapshot()
 
     assertEquals(3, log.logSegments.size)
-    assertEquals(Set(pid1, pid2), log.activeProducers.keySet)
+    assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet)
 
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals(2, log.logSegments.size)
-    assertEquals(Set(pid2), log.activeProducers.keySet)
+    assertEquals(Set(pid2), log.activeProducersWithLastSequence.keySet)
   }
 
   @Test
@@ -749,13 +740,13 @@ class LogTest {
     val records = Seq(new SimpleRecord(mockTime.milliseconds(), "foo".getBytes))
     log.appendAsLeader(TestUtils.records(records, producerId = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0)
 
-    assertEquals(Set(pid), log.activeProducers.keySet)
+    assertEquals(Set(pid), log.activeProducersWithLastSequence.keySet)
 
     mockTime.sleep(producerIdExpirationCheckIntervalMs)
-    assertEquals(Set(pid), log.activeProducers.keySet)
+    assertEquals(Set(pid), log.activeProducersWithLastSequence.keySet)
 
     mockTime.sleep(producerIdExpirationCheckIntervalMs)
-    assertEquals(Set(), log.activeProducers.keySet)
+    assertEquals(Set(), log.activeProducersWithLastSequence.keySet)
   }
 
   @Test
@@ -805,16 +796,22 @@ class LogTest {
       case _: OutOfOrderSequenceException => // Good!
     }
 
-    // Append a Duplicate of an entry in the middle of the log. This is not allowed.
+    // Append a duplicate of the batch which is 4th from the tail. This should succeed without error since we
+    // retain the batch metadata of the last 5 batches.
+    val duplicateOfFourth = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
+      producerId = pid, producerEpoch = epoch, sequence = 2)
+    log.appendAsLeader(duplicateOfFourth, leaderEpoch = 0)
+
+    // Append a Duplicate of an entry older than the last 5 appended batches. This should result in a DuplicateSequenceNumberException.
      try {
       val records = TestUtils.records(
         List(new SimpleRecord(mockTime.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
         producerId = pid, producerEpoch = epoch, sequence = 1)
       log.appendAsLeader(records, leaderEpoch = 0)
-      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
-        "in the middle of the log.")
+      fail ("Should have received an DuplicateSequenceNumberException since we attempted to append a duplicate of a batch" +
+        "which is older than the last 5 appended batches.")
     } catch {
-      case _: OutOfOrderSequenceException => // Good!
+      case _: DuplicateSequenceException => // Good!
     }
 
     // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry.
@@ -872,7 +869,7 @@ class LogTest {
     }
   }
 
-  @Test(expected = classOf[DuplicateSequenceNumberException])
+  @Test(expected = classOf[DuplicateSequenceException])
   def testDuplicateAppendToFollower() : Unit = {
     val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -888,7 +885,7 @@ class LogTest {
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
   }
 
-  @Test(expected = classOf[DuplicateSequenceNumberException])
+  @Test(expected = classOf[DuplicateSequenceException])
   def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
     val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)


Mime
View raw message