kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7736; Consolidate Map usages in TransactionManager (#6270)
Date Wed, 06 Mar 2019 17:52:19 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1db4667  KAFKA-7736; Consolidate Map usages in TransactionManager (#6270)
1db4667 is described below

commit 1db46673661cf6dcb7f2fa2565262b04cf580367
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
AuthorDate: Wed Mar 6 18:52:09 2019 +0100

    KAFKA-7736; Consolidate Map usages in TransactionManager (#6270)
    
    Refactors the various maps used in TransactionManager into one map to simplify bookkeeping of inflight batches, offsets and sequence numbers.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/producer/internals/Sender.java   |   2 +-
 .../producer/internals/TransactionManager.java     | 222 ++++++++++++---------
 .../clients/producer/internals/SenderTest.java     | 146 +++++++-------
 .../producer/internals/TransactionManagerTest.java |   6 -
 4 files changed, 200 insertions(+), 176 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1879d6f..3de2eb4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -668,7 +668,7 @@ public class Sender implements Runnable {
                 log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}",
                     batch.producerId(),
                     batch.topicPartition,
-                    transactionManager.lastAckedSequence(batch.topicPartition));
+                    transactionManager.lastAckedSequence(batch.topicPartition).orElse(-1));
             }
             transactionManager.updateLastAckedOffset(response, batch);
             transactionManager.removeInFlightBatch(batch);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 3d41a3c..b619093 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -54,6 +54,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.function.Supplier;
@@ -66,17 +68,84 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
  */
 public class TransactionManager {
     private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
+    private static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
 
     private final Logger log;
     private final String transactionalId;
     private final int transactionTimeoutMs;
 
-    // The base sequence of the next batch bound for a given partition.
-    private final Map<TopicPartition, Integer> nextSequence;
+    private static class TopicPartitionBookkeeper {
 
-    // The sequence of the last record of the last ack'd batch from the given partition. When there are no
-    // in flight requests for a partition, the lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
-    private final Map<TopicPartition, Integer> lastAckedSequence;
+        private final Map<TopicPartition, TopicPartitionEntry> topicPartitionBookkeeping = new HashMap<>();
+
+        public TopicPartitionEntry getPartition(TopicPartition topic) {
+            TopicPartitionEntry ent = topicPartitionBookkeeping.get(topic);
+            if (ent == null)
+                throw new IllegalStateException("Trying to get the sequence number for " + topic +
+                        ", but the sequence number was never set for this partition.");
+            return ent;
+        }
+
+        public void addPartition(TopicPartition topic) {
+            if (!topicPartitionBookkeeping.containsKey(topic))
+                topicPartitionBookkeeping.put(topic, new TopicPartitionEntry());
+        }
+
+        boolean contains(TopicPartition partition) {
+            return topicPartitionBookkeeping.containsKey(partition);
+        }
+
+        public void reset() {
+            topicPartitionBookkeeping.clear();
+        }
+
+        OptionalLong lastAckedOffset(TopicPartition partition) {
+            TopicPartitionEntry entry = topicPartitionBookkeeping.get(partition);
+            if (entry != null && entry.lastAckedOffset != ProduceResponse.INVALID_OFFSET)
+                return OptionalLong.of(entry.lastAckedOffset);
+            else
+                return OptionalLong.empty();
+        }
+
+        OptionalInt lastAckedSequence(TopicPartition partition) {
+            TopicPartitionEntry entry = topicPartitionBookkeeping.get(partition);
+            if (entry != null && entry.lastAckedSequence != NO_LAST_ACKED_SEQUENCE_NUMBER)
+                return OptionalInt.of(entry.lastAckedSequence);
+            else
+                return OptionalInt.empty();
+        }
+    }
+
+    private static class TopicPartitionEntry {
+
+        // The base sequence of the next batch bound for a given partition.
+        private int nextSequence;
+
+        // The sequence number of the last record of the last ack'd batch from the given partition. When there are no
+        // in flight requests for a partition, the lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
+        private int lastAckedSequence;
+
+        // Keep track of the in flight batches bound for a partition, ordered by sequence. This helps us to ensure that
+        // we continue to order batches by the sequence numbers even when the responses come back out of order during
+        // leader failover. We add a batch to the queue when it is drained, and remove it when the batch completes
+        // (either successfully or through a fatal failure).
+        private PriorityQueue<ProducerBatch> inflightBatchesBySequence;
+
+        // We keep track of the last acknowledged offset on a per partition basis in order to disambiguate UnknownProducer
+        // responses which are due to the retention period elapsing, and those which are due to actual lost data.
+        private long lastAckedOffset;
+
+        TopicPartitionEntry() {
+            this.nextSequence = 0;
+            this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
+            this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
+            this.inflightBatchesBySequence = new PriorityQueue<>(5, Comparator.comparingInt(ProducerBatch::baseSequence));
+        }
+    }
+
+    private final TopicPartitionBookkeeper topicPartitionBookkeeper;
+
+    private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
 
     // If a batch bound for a partition expired locally after being sent at least once, the partition has is considered
     // to have an unresolved state. We keep track fo such partitions here, and cannot assign any more sequence numbers
@@ -86,21 +155,10 @@ public class TransactionManager {
     // consequently clear this data structure as well.
     private final Set<TopicPartition> partitionsWithUnresolvedSequences;
 
-    // Keep track of the in flight batches bound for a partition, ordered by sequence. This helps us to ensure that
-    // we continue to order batches by the sequence numbers even when the responses come back out of order during
-    // leader failover. We add a batch to the queue when it is drained, and remove it when the batch completes
-    // (either successfully or through a fatal failure).
-    private final Map<TopicPartition, PriorityQueue<ProducerBatch>> inflightBatchesBySequence;
-
-    // We keep track of the last acknowledged offset on a per partition basis in order to disambiguate UnknownProducer
-    // responses which are due to the retention period elapsing, and those which are due to actual lost data.
-    private final Map<TopicPartition, Long> lastAckedOffset;
-
     private final PriorityQueue<TxnRequestHandler> pendingRequests;
     private final Set<TopicPartition> newPartitionsInTransaction;
     private final Set<TopicPartition> pendingPartitionsInTransaction;
     private final Set<TopicPartition> partitionsInTransaction;
-    private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
     private TransactionalRequestResult pendingResult;
 
     // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried.
@@ -173,8 +231,6 @@ public class TransactionManager {
 
     public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
         this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
-        this.nextSequence = new HashMap<>();
-        this.lastAckedSequence = new HashMap<>();
         this.transactionalId = transactionalId;
         this.log = logContext.logger(TransactionManager.class);
         this.transactionTimeoutMs = transactionTimeoutMs;
@@ -183,19 +239,11 @@ public class TransactionManager {
         this.newPartitionsInTransaction = new HashSet<>();
         this.pendingPartitionsInTransaction = new HashSet<>();
         this.partitionsInTransaction = new HashSet<>();
+        this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority));
         this.pendingTxnOffsetCommits = new HashMap<>();
-        this.pendingRequests = new PriorityQueue<>(10, new Comparator<TxnRequestHandler>() {
-            @Override
-            public int compare(TxnRequestHandler o1, TxnRequestHandler o2) {
-                return Integer.compare(o1.priority().priority, o2.priority().priority);
-            }
-        });
-
         this.partitionsWithUnresolvedSequences = new HashSet<>();
-        this.inflightBatchesBySequence = new HashMap<>();
-        this.lastAckedOffset = new HashMap<>();
-
         this.retryBackoffMs = retryBackoffMs;
+        this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
     }
 
     TransactionManager() {
@@ -206,7 +254,6 @@ public class TransactionManager {
         return handleCachedTransactionRequestResult(() -> {
             transitionTo(State.INITIALIZING);
             setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
-            this.nextSequence.clear();
             InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
             InitProducerIdHandler handler = new InitProducerIdHandler(builder);
             enqueueRequest(handler);
@@ -273,6 +320,7 @@ public class TransactionManager {
             return;
 
         log.debug("Begin adding new partition {} to transaction", topicPartition);
+        topicPartitionBookkeeper.addPartition(topicPartition);
         newPartitionsInTransaction.add(topicPartition);
     }
 
@@ -280,7 +328,7 @@ public class TransactionManager {
         return lastError;
     }
 
-    public synchronized void failIfNotReadyForSend() {
+    synchronized void failIfNotReadyForSend() {
         if (hasError())
             throw new KafkaException("Cannot perform send because at least one previous transactional or " +
                     "idempotent request has failed with errors.", lastError);
@@ -401,46 +449,31 @@ public class TransactionManager {
             throw new IllegalStateException("Cannot reset producer state for a transactional producer. " +
                     "You must either abort the ongoing transaction or reinitialize the transactional producer instead");
         setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
-        this.nextSequence.clear();
-        this.lastAckedSequence.clear();
-        this.inflightBatchesBySequence.clear();
+        topicPartitionBookkeeper.reset();
         this.partitionsWithUnresolvedSequences.clear();
-        this.lastAckedOffset.clear();
     }
 
     /**
      * Returns the next sequence number to be written to the given TopicPartition.
      */
     synchronized Integer sequenceNumber(TopicPartition topicPartition) {
-        Integer currentSequenceNumber = nextSequence.get(topicPartition);
-        if (currentSequenceNumber == null) {
-            currentSequenceNumber = 0;
-            nextSequence.put(topicPartition, currentSequenceNumber);
-        }
-        return currentSequenceNumber;
+        if (!isTransactional())
+            topicPartitionBookkeeper.addPartition(topicPartition);
+
+        return topicPartitionBookkeeper.getPartition(topicPartition).nextSequence;
     }
 
     synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
-        Integer currentSequenceNumber = nextSequence.get(topicPartition);
-        if (currentSequenceNumber == null)
-            throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");
+        Integer currentSequence = sequenceNumber(topicPartition);
 
-        currentSequenceNumber = DefaultRecordBatch.incrementSequence(currentSequenceNumber, increment);
-        nextSequence.put(topicPartition, currentSequenceNumber);
+        currentSequence = DefaultRecordBatch.incrementSequence(currentSequence, increment);
+        topicPartitionBookkeeper.getPartition(topicPartition).nextSequence = currentSequence;
     }
 
     synchronized void addInFlightBatch(ProducerBatch batch) {
         if (!batch.hasSequence())
             throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set.");
-        if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
-            inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
-                @Override
-                public int compare(ProducerBatch o1, ProducerBatch o2) {
-                    return o1.baseSequence() - o2.baseSequence();
-                }
-            }));
-        }
-        inflightBatchesBySequence.get(batch.topicPartition).offer(batch);
+        topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence.offer(batch);
     }
 
     /**
@@ -451,56 +484,54 @@ public class TransactionManager {
      *         RecordBatch.NO_SEQUENCE.
      */
     synchronized int firstInFlightSequence(TopicPartition topicPartition) {
-        PriorityQueue<ProducerBatch> inFlightBatches = inflightBatchesBySequence.get(topicPartition);
-        if (inFlightBatches == null)
+        if (!hasInflightBatches(topicPartition))
             return RecordBatch.NO_SEQUENCE;
 
-        ProducerBatch firstInFlightBatch = inFlightBatches.peek();
-        if (firstInFlightBatch == null)
+        ProducerBatch first = topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence.peek();
+        if (first == null)
             return RecordBatch.NO_SEQUENCE;
 
-        return firstInFlightBatch.baseSequence();
+        return first.baseSequence();
     }
 
     synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
-        PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(topicPartition);
-        if (queue == null)
-            return null;
+        PriorityQueue<ProducerBatch> queue = topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence;
         return queue.peek();
     }
 
     synchronized void removeInFlightBatch(ProducerBatch batch) {
-        PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(batch.topicPartition);
-        if (queue == null)
-            return;
-        queue.remove(batch);
+        if (hasInflightBatches(batch.topicPartition)) {
+            PriorityQueue<ProducerBatch> queue = topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence;
+            queue.remove(batch);
+        }
     }
 
     synchronized void maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) {
-        if (sequence > lastAckedSequence(topicPartition))
-            lastAckedSequence.put(topicPartition, sequence);
+        if (sequence > lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER))
+            topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = sequence;
     }
 
-    synchronized int lastAckedSequence(TopicPartition topicPartition) {
-        Integer currentLastAckedSequence = lastAckedSequence.get(topicPartition);
-        if (currentLastAckedSequence == null)
-            return -1;
-        return currentLastAckedSequence;
+    synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) {
+        return topicPartitionBookkeeper.lastAckedSequence(topicPartition);
     }
 
-    synchronized long lastAckedOffset(TopicPartition topicPartition) {
-        Long offset = lastAckedOffset.get(topicPartition);
-        if (offset == null)
-            return ProduceResponse.INVALID_OFFSET;
-        return offset;
+    synchronized OptionalLong lastAckedOffset(TopicPartition topicPartition) {
+        return topicPartitionBookkeeper.lastAckedOffset(topicPartition);
     }
 
     synchronized void updateLastAckedOffset(ProduceResponse.PartitionResponse response, ProducerBatch batch) {
         if (response.baseOffset == ProduceResponse.INVALID_OFFSET)
             return;
         long lastOffset = response.baseOffset + batch.recordCount - 1;
-        if (lastOffset > lastAckedOffset(batch.topicPartition)) {
-            lastAckedOffset.put(batch.topicPartition, lastOffset);
+        OptionalLong lastAckedOffset = lastAckedOffset(batch.topicPartition);
+        // It might happen that the TransactionManager has been reset while a request was reenqueued and got a valid
+        // response for this. This can happen only if the producer is only idempotent (not transactional) and in
+        // this case there will be no tracked bookkeeper entry about it, so we have to insert one.
+        if (!lastAckedOffset.isPresent() && !isTransactional()) {
+            topicPartitionBookkeeper.addPartition(batch.topicPartition);
+        }
+        if (lastOffset > lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) {
+            topicPartitionBookkeeper.getPartition(batch.topicPartition).lastAckedOffset = lastOffset;
         } else {
             log.trace("Partition {} keeps lastOffset at {}", batch.topicPartition, lastOffset);
         }
@@ -512,7 +543,7 @@ public class TransactionManager {
     // This method must only be called when we know that the batch is question has been unequivocally failed by the broker,
     // ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar.
     synchronized void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
-        if (!this.nextSequence.containsKey(batch.topicPartition))
+        if (!topicPartitionBookkeeper.contains(batch.topicPartition))
             // Sequence numbers are not being tracked for this partition. This could happen if the producer id was just
             // reset due to a previous OutOfOrderSequenceException.
             return;
@@ -525,7 +556,7 @@ public class TransactionManager {
 
         setNextSequence(batch.topicPartition, currentSequence);
 
-        for (ProducerBatch inFlightBatch : inflightBatchesBySequence.get(batch.topicPartition)) {
+        for (ProducerBatch inFlightBatch : topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence) {
             if (inFlightBatch.baseSequence() < batch.baseSequence())
                 continue;
             int newSequence = inFlightBatch.baseSequence() - batch.recordCount;
@@ -540,7 +571,7 @@ public class TransactionManager {
 
     private synchronized void startSequencesAtBeginning(TopicPartition topicPartition) {
         int sequence = 0;
-        for (ProducerBatch inFlightBatch : inflightBatchesBySequence.get(topicPartition)) {
+        for (ProducerBatch inFlightBatch : topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence) {
             log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}",
                     inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence);
             inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(),
@@ -549,11 +580,12 @@ public class TransactionManager {
             sequence += inFlightBatch.recordCount;
         }
         setNextSequence(topicPartition, sequence);
-        lastAckedSequence.remove(topicPartition);
+        topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
     }
 
-    synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
-        return inflightBatchesBySequence.containsKey(topicPartition) && !inflightBatchesBySequence.get(topicPartition).isEmpty();
+    private synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
+        return topicPartitionBookkeeper.contains(topicPartition)
+                && !topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence.isEmpty();
     }
 
     synchronized boolean hasUnresolvedSequences() {
@@ -587,7 +619,8 @@ public class TransactionManager {
                 } else {
                     // We would enter this branch if all in flight batches were ultimately expired in the producer.
                     log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
-                            "Going to reset producer state.", topicPartition, lastAckedSequence(topicPartition), sequenceNumber(topicPartition));
+                            "Going to reset producer state.", topicPartition,
+                            lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition));
                     return true;
                 }
             }
@@ -595,15 +628,12 @@ public class TransactionManager {
         return false;
     }
 
-    synchronized boolean isNextSequence(TopicPartition topicPartition, int sequence) {
-        return sequence - lastAckedSequence(topicPartition) == 1;
+    private synchronized boolean isNextSequence(TopicPartition topicPartition, int sequence) {
+        return sequence - lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) == 1;
     }
 
     private synchronized void setNextSequence(TopicPartition topicPartition, int sequence) {
-        if (!nextSequence.containsKey(topicPartition) && sequence != 0)
-            throw new IllegalStateException("Trying to set the sequence number for " + topicPartition + " to " + sequence +
-            ", but the sequence number was never set for this partition.");
-        nextSequence.put(topicPartition, sequence);
+        topicPartitionBookkeeper.getPartition(topicPartition).nextSequence = sequence;
     }
 
     synchronized TxnRequestHandler nextRequestHandler(boolean hasIncompleteBatches) {
@@ -670,7 +700,7 @@ public class TransactionManager {
         inFlightRequestCorrelationId = correlationId;
     }
 
-    void clearInFlightTransactionalRequestCorrelationId() {
+    private void clearInFlightCorrelationId() {
         inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
@@ -737,7 +767,7 @@ public class TransactionManager {
                 // come back from the broker, they would also come with an UNKNOWN_PRODUCER_ID error. In this case, we should not
                 // reset the sequence numbers to the beginning.
                 return true;
-            } else if (lastAckedOffset(batch.topicPartition) < response.logStartOffset) {
+            } else if (lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
                 // The head of the log has been removed, probably due to the retention time elapsing. In this case,
                 // we expect to lose the producer state. Reset the sequences of all inflight batches to be from the beginning
                 // and retry them.
@@ -915,7 +945,7 @@ public class TransactionManager {
             if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
                 fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
             } else {
-                clearInFlightTransactionalRequestCorrelationId();
+                clearInFlightCorrelationId();
                 if (response.wasDisconnected()) {
                     log.debug("Disconnected from {}. Will retry.", response.destination());
                     if (this.needsCoordinator())
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 1fae14d..8d599c9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -589,14 +591,14 @@ public class SenderTest {
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
         assertTrue(client.isReady(node, time.milliseconds()));
@@ -606,14 +608,14 @@ public class SenderTest {
         sender.run(time.milliseconds()); // receive response 0
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
         assertFalse(request2.isDone());
 
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
         sender.run(time.milliseconds()); // receive response 1
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
         assertFalse(client.hasInFlightRequests());
         assertEquals(0, sender.inFlightBatches(tp0).size());
         assertTrue(request2.isDone());
@@ -639,7 +641,7 @@ public class SenderTest {
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -651,7 +653,7 @@ public class SenderTest {
 
         assertEquals(3, client.inFlightRequestCount());
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
         assertFalse(request3.isDone());
@@ -664,7 +666,7 @@ public class SenderTest {
         Future<RecordMetadata> request4 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
 
         assertEquals(2, client.inFlightRequestCount());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
         sender.run(time.milliseconds()); // re send request 1, receive response 2
@@ -672,7 +674,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
         sender.run(time.milliseconds()); // receive response 3
 
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertEquals(1, client.inFlightRequestCount());
 
         sender.run(time.milliseconds()); // Do nothing, we are reduced to one in flight request during retries.
@@ -680,11 +682,11 @@ public class SenderTest {
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());  // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented.
         assertEquals(1, client.inFlightRequestCount());
 
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
         sender.run(time.milliseconds());  // receive response 1
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
         assertFalse(client.hasInFlightRequests());
@@ -696,7 +698,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
         sender.run(time.milliseconds());  // receive response 2
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
 
@@ -709,7 +711,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L);
         sender.run(time.milliseconds());  // receive response 3, send request 4 since we are out of 'retry' mode.
-        assertEquals(2, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(2), transactionManager.lastAckedSequence(tp0));
         assertTrue(request3.isDone());
         assertEquals(2, request3.get().offset());
         assertEquals(1, client.inFlightRequestCount());
@@ -717,7 +719,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L);
         sender.run(time.milliseconds());  // receive response 4
-        assertEquals(3, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(3), transactionManager.lastAckedSequence(tp0));
         assertTrue(request4.isDone());
         assertEquals(3, request4.get().offset());
     }
@@ -739,14 +741,14 @@ public class SenderTest {
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
         assertTrue(client.isReady(node, time.milliseconds()));
@@ -757,24 +759,24 @@ public class SenderTest {
         assertFutureFailure(request1, RecordTooLargeException.class);
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
 
         sender.run(time.milliseconds()); // receive response 1
 
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
 
         sender.run(time.milliseconds()); // resend request 1
 
         assertEquals(1, client.inFlightRequestCount());
 
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
         sender.run(time.milliseconds());  // receive response 1
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
 
         assertTrue(request1.isDone());
@@ -801,7 +803,7 @@ public class SenderTest {
 
         // make sure the next sequence number accounts for multi-message batches.
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0);
 
         sender.run(time.milliseconds());
@@ -811,7 +813,7 @@ public class SenderTest {
         sender.run(time.milliseconds());
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
         assertFalse(request2.isDone());
@@ -840,14 +842,14 @@ public class SenderTest {
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
         assertTrue(client.isReady(node, time.milliseconds()));
@@ -864,7 +866,7 @@ public class SenderTest {
         assertEquals(1, queuedBatches.size());
         assertEquals(1, queuedBatches.peekFirst().baseSequence());
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1));
 
@@ -874,7 +876,7 @@ public class SenderTest {
         assertEquals(2, queuedBatches.size());
         assertEquals(0, queuedBatches.peekFirst().baseSequence());
         assertEquals(1, queuedBatches.peekLast().baseSequence());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
@@ -884,12 +886,12 @@ public class SenderTest {
         sender.run(time.milliseconds()); // don't do anything, only one inflight allowed once we are retrying.
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         // Make sure that the requests are sent in order, even though the previous responses were not in order.
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
         sender.run(time.milliseconds());  // receive response 0
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
@@ -900,7 +902,7 @@ public class SenderTest {
         sender.run(time.milliseconds());  // receive response 1
 
         assertFalse(client.hasInFlightRequests());
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
     }
@@ -943,7 +945,7 @@ public class SenderTest {
 
         assertEquals(0, queuedBatches.size());
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
 
         client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.REQUEST_TIMED_OUT, -1));
 
@@ -952,20 +954,20 @@ public class SenderTest {
         // Make sure we requeued both batches in the correct order.
         assertEquals(1, queuedBatches.size());
         assertEquals(0, queuedBatches.peekFirst().baseSequence());
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
 
         sender.run(time.milliseconds()); // resend request 0
         assertEquals(1, client.inFlightRequestCount());
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
 
         // Make sure we handle the out of order successful responses correctly.
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
         sender.run(time.milliseconds());  // receive response 0
         assertEquals(0, queuedBatches.size());
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
 
         assertFalse(client.hasInFlightRequests());
@@ -1169,8 +1171,7 @@ public class SenderTest {
                 "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // connect.
-        sender.run(time.milliseconds());  // send.
+        sender.run(time.milliseconds());  // connect and send.
 
         assertEquals(1, client.inFlightRequestCount());
 
@@ -1181,9 +1182,8 @@ public class SenderTest {
         sender.run(time.milliseconds());
         assertTrue(failedResponse.isDone());
         assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
-        prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
+        prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE); // also send request to tp1
         assertEquals(producerId + 1, transactionManager.producerIdAndEpoch().producerId);
-        sender.run(time.milliseconds());  // send request to tp1
 
         assertFalse(successfulResponse.isDone());
         client.respond(produceResponse(tp1, 10, Errors.NONE, -1));
@@ -1263,14 +1263,14 @@ public class SenderTest {
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
         assertTrue(client.isReady(node, time.milliseconds()));
@@ -1282,16 +1282,16 @@ public class SenderTest {
 
         sender.run(time.milliseconds()); // receive response 1
 
-        assertEquals(1000, transactionManager.lastAckedOffset(tp0));
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalLong.of(1000), transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
 
         client.respondToRequest(firstClientRequest, produceResponse(tp0, ProduceResponse.INVALID_OFFSET, Errors.DUPLICATE_SEQUENCE_NUMBER, 0));
 
         sender.run(time.milliseconds()); // receive response 0
 
         // Make sure that the last ack'd sequence doesn't change.
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
-        assertEquals(1000, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalLong.of(1000), transactionManager.lastAckedOffset(tp0));
         assertFalse(client.hasInFlightRequests());
 
         RecordMetadata unknownMetadata = request1.get();
@@ -1315,7 +1315,7 @@ public class SenderTest {
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
 
@@ -1323,15 +1323,15 @@ public class SenderTest {
 
         assertTrue(request1.isDone());
         assertEquals(1000L, request1.get().offset());
-        assertEquals(0L, transactionManager.lastAckedSequence(tp0));
-        assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
 
         // Send second ProduceRequest, a single batch with 2 records.
         accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
 
@@ -1339,7 +1339,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // receive response 0, should be retried since the logStartOffset > lastAckedOffset.
 
         // We should have reset the sequence number state of the partition because the state was lost on the broker.
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertFalse(request2.isDone());
         assertFalse(client.hasInFlightRequests());
@@ -1349,12 +1349,12 @@ public class SenderTest {
         // resend the request. Note that the expected sequence is 0, since we have lost producer state on the broker.
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
         sender.run(time.milliseconds()); // receive response 1
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertFalse(client.hasInFlightRequests());
         assertTrue(request2.isDone());
         assertEquals(1012L, request2.get().offset());
-        assertEquals(1012L, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalLong.of(1012L), transactionManager.lastAckedOffset(tp0));
     }
 
     @Test
@@ -1373,7 +1373,7 @@ public class SenderTest {
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
 
@@ -1381,14 +1381,14 @@ public class SenderTest {
 
         assertTrue(request1.isDone());
         assertEquals(1000L, request1.get().offset());
-        assertEquals(0L, transactionManager.lastAckedSequence(tp0));
-        assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
 
@@ -1396,7 +1396,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // receive response 0, should be retried without resetting the sequence numbers since the log start offset is unknown.
 
         // We should have reset the sequence number state of the partition because the state was lost on the broker.
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertFalse(request2.isDone());
         assertFalse(client.hasInFlightRequests());
@@ -1407,12 +1407,12 @@ public class SenderTest {
         // response and hence we didn't reset the sequence numbers.
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1011L, 1010L);
         sender.run(time.milliseconds()); // receive response 1
-        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertFalse(client.hasInFlightRequests());
         assertTrue(request2.isDone());
         assertEquals(1011L, request2.get().offset());
-        assertEquals(1011L, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalLong.of(1011L), transactionManager.lastAckedOffset(tp0));
     }
 
     @Test
@@ -1431,7 +1431,7 @@ public class SenderTest {
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
 
@@ -1439,21 +1439,21 @@ public class SenderTest {
 
         assertTrue(request1.isDone());
         assertEquals(1000L, request1.get().offset());
-        assertEquals(0L, transactionManager.lastAckedSequence(tp0));
-        assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
 
         // Send the third ProduceRequest, in parallel with the second. It should be retried even though the
         // lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back.
         Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
         assertFalse(request3.isDone());
@@ -1464,7 +1464,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // receive response 2, should reset the sequence numbers and be retried.
 
         // We should have reset the sequence number state of the partition because the state was lost on the broker.
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertFalse(request2.isDone());
         assertFalse(request3.isDone());
@@ -1479,7 +1479,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // receive response 3
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
@@ -1488,9 +1488,9 @@ public class SenderTest {
         assertTrue(request2.isDone());
         assertFalse(request3.isDone());
         assertFalse(client.hasInFlightRequests());
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
         assertEquals(1011L, request2.get().offset());
-        assertEquals(1011L, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalLong.of(1011L), transactionManager.lastAckedOffset(tp0));
 
         sender.run(time.milliseconds());  // resend request 3.
         assertEquals(1, client.inFlightRequestCount());
@@ -1501,7 +1501,7 @@ public class SenderTest {
         assertFalse(client.hasInFlightRequests());
         assertTrue(request3.isDone());
         assertEquals(1012L, request3.get().offset());
-        assertEquals(1012L, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalLong.of(1012L), transactionManager.lastAckedOffset(tp0));
     }
 
     @Test
@@ -1520,7 +1520,7 @@ public class SenderTest {
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
 
@@ -1528,14 +1528,14 @@ public class SenderTest {
 
         assertTrue(request1.isDone());
         assertEquals(1000L, request1.get().offset());
-        assertEquals(0L, transactionManager.lastAckedSequence(tp0));
-        assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
 
         // Send second ProduceRequest,
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
-        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
 
@@ -1724,7 +1724,7 @@ public class SenderTest {
 
         sender.run(time.milliseconds());  // receive response
         assertTrue(responseFuture.isDone());
-        assertEquals(0L, (long) transactionManager.lastAckedSequence(tp0));
+        assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
         assertEquals(1L, (long) transactionManager.sequenceNumber(tp0));
     }
 
@@ -1883,7 +1883,7 @@ public class SenderTest {
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f1.isDone());
             assertEquals("The next sequence number should still be 2", 2, txnManager.sequenceNumber(tp).longValue());
-            assertEquals("The last ack'd sequence number should be 0", 0, txnManager.lastAckedSequence(tp));
+            assertEquals("The last ack'd sequence number should be 0", OptionalInt.of(0), txnManager.lastAckedSequence(tp));
             assertFalse("The future shouldn't have been done.", f2.isDone());
             assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
             sender.run(time.milliseconds()); // send the seconcd produce request
@@ -1900,7 +1900,7 @@ public class SenderTest {
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f2.isDone());
             assertEquals("The next sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
-            assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp));
+            assertEquals("The last ack'd sequence number should be 1", OptionalInt.of(1), txnManager.lastAckedSequence(tp));
             assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
             assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
             assertTrue("There should be a split", (Double) (m.metrics().get(senderMetrics.batchSplitRate).metricValue()) > 0);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 7335419..b476961 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -563,12 +563,6 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testInvalidSequenceIncrement() {
-        TransactionManager transactionManager = new TransactionManager();
-        transactionManager.incrementSequenceNumber(tp0, 3333);
-    }
-
     @Test
     public void testDefaultSequenceNumber() {
         TransactionManager transactionManager = new TransactionManager();


Mime
View raw message