kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Some cleanups in the transactional producer
Date Tue, 02 May 2017 00:11:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1f2451d4e -> 67f1f4d27


MINOR: Some cleanups in the transactional producer

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2933 from hachikuji/minor-transactional-client-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/67f1f4d2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/67f1f4d2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/67f1f4d2

Branch: refs/heads/trunk
Commit: 67f1f4d270c279480cea5e6120ca157637910cba
Parents: 1f2451d
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue May 2 01:09:13 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue May 2 01:10:40 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  18 +-
 .../internals/FutureTransactionalResult.java    |  64 --
 .../clients/producer/internals/PidAndEpoch.java |  36 +
 .../producer/internals/ProducerBatch.java       |   2 +-
 .../producer/internals/RecordAccumulator.java   |   2 +-
 .../clients/producer/internals/Sender.java      |  48 +-
 .../producer/internals/TransactionManager.java  | 651 +++++++++----------
 .../internals/TransactionalRequestResult.java   |  15 +-
 .../common/requests/AddOffsetsToTxnRequest.java |   4 +
 .../common/requests/FindCoordinatorRequest.java |   8 +
 .../common/requests/TxnOffsetCommitRequest.java |   4 +
 .../clients/producer/internals/SenderTest.java  |   8 +-
 .../internals/TransactionManagerTest.java       |  45 +-
 13 files changed, 443 insertions(+), 462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index deca51f..286387b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -21,11 +21,11 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.internals.FutureTransactionalResult;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -484,7 +484,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void initTransactions() {
         if (transactionManager == null)
             throw new IllegalStateException("Cannot call initTransactions without setting a transactional id.");
-        transactionManager.initializeTransactions().get();
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        sender.wakeup();
+        result.await();
     }
 
     /**
@@ -515,9 +517,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                                          String consumerGroupId) throws ProducerFencedException {
         if (transactionManager == null)
             throw new IllegalStateException("Cannot send offsets to transaction since transactions are not enabled.");
-        FutureTransactionalResult result = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+        TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
         sender.wakeup();
-        result.get();
+        result.await();
     }
 
     /**
@@ -529,9 +531,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void commitTransaction() throws ProducerFencedException {
         if (transactionManager == null)
             throw new IllegalStateException("Cannot commit transaction since transactions are not enabled");
-        FutureTransactionalResult result = transactionManager.beginCommittingTransaction();
+        TransactionalRequestResult result = transactionManager.beginCommittingTransaction();
         sender.wakeup();
-        result.get();
+        result.await();
     }
 
     /**
@@ -543,9 +545,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void abortTransaction() throws ProducerFencedException {
         if (transactionManager == null)
             throw new IllegalStateException("Cannot abort transaction since transactions are not enabled.");
-        FutureTransactionalResult result = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult result = transactionManager.beginAbortingTransaction();
         sender.wakeup();
-        result.get();
+        result.await();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureTransactionalResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureTransactionalResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureTransactionalResult.java
deleted file mode 100644
index d05bc6a..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureTransactionalResult.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public final class FutureTransactionalResult implements Future<TransactionalRequestResult> {
-
-    private final TransactionalRequestResult result;
-
-    public FutureTransactionalResult(TransactionalRequestResult result) {
-        this.result = result;
-    }
-
-    @Override
-    public boolean isDone() {
-        return this.result.isCompleted();
-    }
-
-    @Override
-    public boolean isCancelled() {
-        return false;
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-        return false;
-    }
-
-    @Override
-    public TransactionalRequestResult get() {
-        this.result.await();
-        if (!result.isSuccessful()) {
-            throw result.error();
-        }
-        return result;
-    }
-
-    @Override
-    public TransactionalRequestResult get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
-        boolean occurred = this.result.await(timeout, unit);
-        if (!occurred) {
-            throw new TimeoutException("Could not complete transactional operation within " + TimeUnit.MILLISECONDS.convert(timeout, unit) + "ms.");
-        }
-        return result;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
new file mode 100644
index 0000000..8647a7b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+class PidAndEpoch {
+    static final PidAndEpoch NONE = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+
+    public final long producerId;
+    public final short epoch;
+
+    PidAndEpoch(long producerId, short epoch) {
+        this.producerId = producerId;
+        this.epoch = epoch;
+    }
+
+    public boolean isValid() {
+        return NO_PRODUCER_ID < producerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 6d5ca15..f5fe8e6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -231,7 +231,7 @@ public final class ProducerBatch {
         return recordsBuilder.isFull();
     }
 
-    public void setProducerState(TransactionManager.PidAndEpoch pidAndEpoch, int baseSequence) {
+    public void setProducerState(PidAndEpoch pidAndEpoch, int baseSequence) {
         recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index dcbf691..4ffab0a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -444,7 +444,7 @@ public final class RecordAccumulator {
                                         // request
                                         break;
                                     } else {
-                                        TransactionManager.PidAndEpoch pidAndEpoch = null;
+                                        PidAndEpoch pidAndEpoch = null;
                                         if (transactionManager != null) {
                                             pidAndEpoch = transactionManager.pidAndEpoch();
                                             if (!pidAndEpoch.isValid())

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 698f4de..4d95ac0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -274,39 +274,39 @@ public class Sender implements Runnable {
     }
 
     private boolean maybeSendTransactionalRequest(long now) {
-        if (transactionManager != null && transactionManager.hasInflightTransactionalRequest())
+        if (transactionManager != null && transactionManager.hasInflightRequest())
             return true;
 
         if (transactionManager == null || !transactionManager.hasPendingTransactionalRequests())
             return false;
 
-        TransactionManager.TransactionalRequest nextRequest = transactionManager.nextTransactionalRequest();
-
-        if (nextRequest.isEndTxnRequest() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
-            if (!accumulator.flushInProgress())
-                accumulator.beginFlush();
-            transactionManager.reenqueue(nextRequest);
-            return false;
-        }
-
-        if (nextRequest.isEndTxnRequest() && transactionManager.isInErrorState()) {
-            nextRequest.maybeTerminateWithError(new KafkaException("Cannot commit transaction when there are " +
-                    "request errors. Please check your logs for the details of the errors encountered."));
-            return false;
+        TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
+
+        if (nextRequestHandler.isEndTxn()) {
+            if (transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
+                if (!accumulator.flushInProgress())
+                    accumulator.beginFlush();
+                transactionManager.reenqueue(nextRequestHandler);
+                return false;
+            } else if (transactionManager.isInErrorState()) {
+                nextRequestHandler.fatal(new KafkaException("Cannot commit transaction when there are " +
+                        "request errors. Please check your logs for the details of the errors encountered."));
+                return false;
+            }
         }
 
         Node targetNode = null;
 
         while (targetNode == null) {
             try {
-                if (nextRequest.needsCoordinator()) {
-                    targetNode = transactionManager.coordinator(nextRequest.coordinatorType());
+                if (nextRequestHandler.needsCoordinator()) {
+                    targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());
                     if (targetNode == null) {
-                        transactionManager.needsCoordinator(nextRequest);
+                        transactionManager.lookupCoordinator(nextRequestHandler);
                         break;
                     }
                     if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout)) {
-                        transactionManager.needsCoordinator(nextRequest);
+                        transactionManager.lookupCoordinator(nextRequestHandler);
                         targetNode = null;
                         break;
                     }
@@ -314,11 +314,11 @@ public class Sender implements Runnable {
                     targetNode = awaitLeastLoadedNodeReady(requestTimeout);
                 }
                 if (targetNode != null) {
-                    if (nextRequest.isRetry()) {
+                    if (nextRequestHandler.isRetry()) {
                         time.sleep(retryBackoffMs);
                     }
-                    ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequest.requestBuilder(),
-                            now, true, nextRequest.responseHandler());
+                    ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequestHandler.requestBuilder(),
+                            now, true, nextRequestHandler);
                     transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
                     client.send(clientRequest, now);
                     return true;
@@ -331,7 +331,7 @@ public class Sender implements Runnable {
         }
 
         if (targetNode == null)
-            transactionManager.needsRetry(nextRequest);
+            transactionManager.retry(nextRequestHandler);
 
         return true;
     }
@@ -383,7 +383,9 @@ public class Sender implements Runnable {
                     ClientResponse response = sendAndAwaitInitPidRequest(node);
                     if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                         InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
-                        transactionManager.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
+                        PidAndEpoch pidAndEpoch = new PidAndEpoch(
+                                initPidResponse.producerId(), initPidResponse.epoch());
+                        transactionManager.setPidAndEpoch(pidAndEpoch);
                     } else {
                         log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                 "We will back off and try again.", node);

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 644d4f8..ff3f114 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -39,6 +40,7 @@ import org.apache.kafka.common.requests.InitPidResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,24 +61,25 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
  */
 public class TransactionManager {
     private static final Logger log = LoggerFactory.getLogger(TransactionManager.class);
-
     private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
 
-    private volatile PidAndEpoch pidAndEpoch;
-    private final Map<TopicPartition, Integer> sequenceNumbers;
     private final String transactionalId;
     private final int transactionTimeoutMs;
-    private final PriorityQueue<TransactionalRequest> pendingTransactionalRequests;
+
+    private final Map<TopicPartition, Integer> sequenceNumbers;
+    private final PriorityQueue<TxnRequestHandler> pendingRequests;
     private final Set<TopicPartition> newPartitionsToBeAddedToTransaction;
     private final Set<TopicPartition> pendingPartitionsToBeAddedToTransaction;
     private final Set<TopicPartition> partitionsInTransaction;
-    private final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> pendingTxnOffsetCommits;
-    private int inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
+    private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
 
+    private int inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
     private Node transactionCoordinator;
     private Node consumerGroupCoordinator;
+
     private volatile State currentState = State.UNINITIALIZED;
-    private Exception lastError = null;
+    private volatile Exception lastError = null;
+    private volatile PidAndEpoch pidAndEpoch;
 
     private enum State {
         UNINITIALIZED,
@@ -109,143 +112,55 @@ public class TransactionManager {
         }
     }
 
+
+    // We use the priority to determine the order in which requests need to be sent out. For instance, if we have
+    // a pending FindCoordinator request, that must always go first. Next, If we need a PID, that must go second.
+    // The endTxn request must always go last.
+    private enum Priority {
+        FIND_COORDINATOR(0),
+        INIT_PRODUCER_ID(1),
+        ADD_PARTITIONS_OR_OFFSETS(2),
+        END_TXN(3);
+
+        final int priority;
+
+        Priority(int priority) {
+            this.priority = priority;
+        }
+    }
+
     public TransactionManager(String transactionalId, int transactionTimeoutMs) {
-        pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
-        sequenceNumbers = new HashMap<>();
+        this.pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        this.sequenceNumbers = new HashMap<>();
         this.transactionalId = transactionalId;
         this.transactionTimeoutMs = transactionTimeoutMs;
-        this.pendingTransactionalRequests = new PriorityQueue<>(10, new Comparator<TransactionalRequest>() {
-            @Override
-            public int compare(TransactionalRequest o1, TransactionalRequest o2) {
-                return Integer.compare(o1.priority().priority(), o2.priority.priority());
-            }
-        });
         this.transactionCoordinator = null;
         this.consumerGroupCoordinator = null;
         this.newPartitionsToBeAddedToTransaction = new HashSet<>();
         this.pendingPartitionsToBeAddedToTransaction = new HashSet<>();
         this.partitionsInTransaction = new HashSet<>();
         this.pendingTxnOffsetCommits = new HashMap<>();
-    }
-
-    public TransactionManager() {
-        this("", 0);
-    }
-
-    static class TransactionalRequest {
-        private enum Priority {
-            FIND_COORDINATOR(0),
-            INIT_PRODUCER_ID(1),
-            ADD_PARTITIONS_OR_OFFSETS(2),
-            END_TXN(4);
-
-            private final int priority;
-
-            Priority(int priority) {
-                this.priority = priority;
-            }
-
-            public int priority() {
-                return this.priority;
-            }
-        }
-
-        private final AbstractRequest.Builder<?> requestBuilder;
-
-        private final FindCoordinatorRequest.CoordinatorType coordinatorType;
-        private final String coordinatorKey;
-        private final RequestCompletionHandler handler;
-        // We use the priority to determine the order in which requests need to be sent out. For instance, if we have
-        // a pending FindCoordinator request, that must always go first. Next, If we need a Pid, that must go second.
-        // The endTxn request must always go last.
-        private final Priority priority;
-        private final TransactionalRequestResult result;
-        private boolean isRetry;
-
-        private TransactionalRequest(AbstractRequest.Builder<?> requestBuilder, RequestCompletionHandler handler,
-                                     FindCoordinatorRequest.CoordinatorType coordinatorType, Priority priority,
-                                     boolean isRetry, String coordinatorKey, TransactionalRequestResult result) {
-            this.requestBuilder = requestBuilder;
-            this.handler = handler;
-            this.coordinatorType = coordinatorType;
-            this.priority = priority;
-            this.isRetry = isRetry;
-            this.coordinatorKey = coordinatorKey;
-            this.result = result;
-        }
-
-        AbstractRequest.Builder<?> requestBuilder() {
-            return requestBuilder;
-        }
-
-        boolean needsCoordinator() {
-            return coordinatorType != null;
-        }
-
-        FindCoordinatorRequest.CoordinatorType coordinatorType() {
-            return coordinatorType;
-        }
-
-        RequestCompletionHandler responseHandler() {
-            return handler;
-        }
-
-        boolean isRetry() {
-            return isRetry;
-        }
-
-        boolean isEndTxnRequest() {
-            return priority == Priority.END_TXN;
-        }
-
-        boolean maybeTerminateWithError(RuntimeException error) {
-            if (result != null) {
-                result.setError(error);
-                result.done();
-                return true;
+        this.pendingRequests = new PriorityQueue<>(10, new Comparator<TxnRequestHandler>() {
+            @Override
+            public int compare(TxnRequestHandler o1, TxnRequestHandler o2) {
+                return Integer.compare(o1.priority().priority, o2.priority().priority);
             }
-            return false;
-        }
-
-        private void setRetry() {
-            isRetry = true;
-        }
-
-        private Priority priority() {
-            return priority;
-        }
+        });
     }
 
-    static class PidAndEpoch {
-        public final long producerId;
-        public final short epoch;
-
-        PidAndEpoch(long producerId, short epoch) {
-            this.producerId = producerId;
-            this.epoch = epoch;
-        }
-
-        public boolean isValid() {
-            return NO_PRODUCER_ID < producerId;
-        }
+    TransactionManager() {
+        this("", 0);
     }
 
-    public synchronized FutureTransactionalResult initializeTransactions() {
+    public synchronized TransactionalRequestResult initializeTransactions() {
         ensureTransactional();
         transitionTo(State.INITIALIZING);
-        setPidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        setPidAndEpoch(PidAndEpoch.NONE);
         this.sequenceNumbers.clear();
-        if (transactionCoordinator == null)
-            pendingTransactionalRequests.add(findCoordinatorRequest(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId, false));
-
-        TransactionalRequestResult result = new TransactionalRequestResult();
-        FutureTransactionalResult resultFuture = new FutureTransactionalResult(result);
-        if (!hasPid())
-            pendingTransactionalRequests.add(initPidRequest(false, result));
-        else
-            result.done();
-
-        return resultFuture;
+        InitPidRequest.Builder builder = new InitPidRequest.Builder(transactionalId, transactionTimeoutMs);
+        InitPidHandler handler = new InitPidHandler(builder);
+        pendingRequests.add(handler);
+        return handler.result;
     }
 
     public synchronized void beginTransaction() {
@@ -254,14 +169,14 @@ public class TransactionManager {
         transitionTo(State.IN_TRANSACTION);
     }
 
-    public synchronized FutureTransactionalResult beginCommittingTransaction() {
+    public synchronized TransactionalRequestResult beginCommittingTransaction() {
         ensureTransactional();
         maybeFailWithError();
         transitionTo(State.COMMITTING_TRANSACTION);
         return beginCompletingTransaction(true);
     }
 
-    public synchronized FutureTransactionalResult beginAbortingTransaction() {
+    public synchronized TransactionalRequestResult beginAbortingTransaction() {
         ensureTransactional();
         if (isFenced())
             throw new ProducerFencedException("There is a newer producer using the same transactional.id.");
@@ -269,18 +184,20 @@ public class TransactionManager {
         return beginCompletingTransaction(false);
     }
 
-    private FutureTransactionalResult beginCompletingTransaction(boolean isCommit) {
-        TransactionalRequestResult result = new TransactionalRequestResult();
-        FutureTransactionalResult resultFuture = new FutureTransactionalResult(result);
-
+    private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) {
         if (!newPartitionsToBeAddedToTransaction.isEmpty()) {
-            pendingTransactionalRequests.add(addPartitionsToTransactionRequest(false));
+            pendingRequests.add(addPartitionsToTransactionHandler());
         }
-        pendingTransactionalRequests.add(endTxnRequest(isCommit, false, result));
-        return resultFuture;
+
+        TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
+        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, pidAndEpoch.producerId,
+                pidAndEpoch.epoch, transactionResult);
+        EndTxnHandler handler = new EndTxnHandler(builder);
+        pendingRequests.add(handler);
+        return handler.result;
     }
 
-    public synchronized FutureTransactionalResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
+    public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                                                            String consumerGroupId) {
         ensureTransactional();
         maybeFailWithError();
@@ -288,10 +205,11 @@ public class TransactionManager {
             throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " +
                     "active transaction");
 
-        TransactionalRequestResult result = new TransactionalRequestResult();
-        FutureTransactionalResult resultFuture = new FutureTransactionalResult(result);
-        pendingTransactionalRequests.add(addOffsetsToTxnRequest(offsets, consumerGroupId, false, result));
-        return resultFuture;
+        AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
+                pidAndEpoch.producerId, pidAndEpoch.epoch, consumerGroupId);
+        AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
+        pendingRequests.add(handler);
+        return handler.result;
     }
 
     public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
@@ -354,11 +272,10 @@ public class TransactionManager {
     }
 
     /**
-     * Set the pid and epoch atomically. This method will signal any callers blocked on the `pidAndEpoch` method
-     * once the pid is set. This method will be called on the background thread when the broker responds with the pid.
+     * Set the pid and epoch atomically.
      */
-    synchronized void setPidAndEpoch(long pid, short epoch) {
-        this.pidAndEpoch = new PidAndEpoch(pid, epoch);
+    void setPidAndEpoch(PidAndEpoch pidAndEpoch) {
+        this.pidAndEpoch = pidAndEpoch;
     }
 
     /**
@@ -382,7 +299,7 @@ public class TransactionManager {
         if (isTransactional())
             throw new IllegalStateException("Cannot reset producer state for a transactional producer. " +
                     "You must either abort the ongoing transaction or reinitialize the transactional producer instead");
-        setPidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        setPidAndEpoch(PidAndEpoch.NONE);
         this.sequenceNumbers.clear();
     }
 
@@ -408,27 +325,26 @@ public class TransactionManager {
     }
 
     boolean hasPendingTransactionalRequests() {
-        return !(pendingTransactionalRequests.isEmpty()
-                && newPartitionsToBeAddedToTransaction.isEmpty());
+        return !(pendingRequests.isEmpty() && newPartitionsToBeAddedToTransaction.isEmpty());
     }
 
-    TransactionalRequest nextTransactionalRequest() {
+    TxnRequestHandler nextRequestHandler() {
         if (!hasPendingTransactionalRequests())
             return null;
 
         if (!newPartitionsToBeAddedToTransaction.isEmpty())
-            pendingTransactionalRequests.add(addPartitionsToTransactionRequest(false));
+            pendingRequests.add(addPartitionsToTransactionHandler());
 
-        return pendingTransactionalRequests.poll();
+        return pendingRequests.poll();
     }
 
-    void needsRetry(TransactionalRequest request) {
+    void retry(TxnRequestHandler request) {
         request.setRetry();
-        pendingTransactionalRequests.add(request);
+        pendingRequests.add(request);
     }
 
-    void reenqueue(TransactionalRequest request) {
-        pendingTransactionalRequests.add(request);
+    void reenqueue(TxnRequestHandler request) {
+        pendingRequests.add(request);
     }
 
     Node coordinator(FindCoordinatorRequest.CoordinatorType type) {
@@ -442,19 +358,19 @@ public class TransactionManager {
         }
     }
 
-    void needsCoordinator(TransactionalRequest request) {
-        needsCoordinator(request.coordinatorType, request.coordinatorKey);
+    void lookupCoordinator(TxnRequestHandler request) {
+        lookupCoordinator(request.coordinatorType(), request.coordinatorKey());
     }
 
     void setInFlightRequestCorrelationId(int correlationId) {
         inFlightRequestCorrelationId = correlationId;
     }
 
-    void resetInFlightRequestCorrelationId() {
+    void clearInFlightRequestCorrelationId() {
         inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
-    boolean hasInflightTransactionalRequest() {
+    boolean hasInflightRequest() {
         return inFlightRequestCorrelationId != NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
@@ -477,13 +393,14 @@ public class TransactionManager {
         transitionTo(target, null);
     }
 
-    private void transitionTo(State target, Exception error) {
+    private synchronized void transitionTo(State target, Exception error) {
         if (target == State.ERROR && error != null)
             lastError = error;
         if (currentState.isTransitionValid(currentState, target)) {
             currentState = target;
         } else {
-            throw new KafkaException("Invalid transition attempted from state " + currentState.name() + " to state " + target.name());
+            throw new KafkaException("Invalid transition attempted from state " + currentState.name() +
+                    " to state " + target.name());
         }
     }
 
@@ -504,7 +421,7 @@ public class TransactionManager {
         }
     }
 
-    private void needsCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
+    private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
         switch (type) {
             case GROUP:
                 consumerGroupCoordinator = null;
@@ -515,9 +432,11 @@ public class TransactionManager {
             default:
                 throw new IllegalStateException("Got an invalid coordinator type: " + type);
         }
-        pendingTransactionalRequests.add(findCoordinatorRequest(type, coordinatorKey, false));
-    }
 
+        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
+        FindCoordinatorHandler request = new FindCoordinatorHandler(builder);
+        pendingRequests.add(request);
+    }
 
     private void completeTransaction() {
         transitionTo(State.READY);
@@ -525,139 +444,163 @@ public class TransactionManager {
         partitionsInTransaction.clear();
     }
 
-    private TransactionalRequest initPidRequest(boolean isRetry, TransactionalRequestResult result) {
-        InitPidRequest.Builder builder = new InitPidRequest.Builder(transactionalId, transactionTimeoutMs);
-        return new TransactionalRequest(builder, new InitPidCallback(result),
-                FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionalRequest.Priority.INIT_PRODUCER_ID, isRetry, transactionalId, result);
-    }
-
-    private synchronized TransactionalRequest addPartitionsToTransactionRequest(boolean isRetry) {
+    private synchronized TxnRequestHandler addPartitionsToTransactionHandler() {
         pendingPartitionsToBeAddedToTransaction.addAll(newPartitionsToBeAddedToTransaction);
         newPartitionsToBeAddedToTransaction.clear();
         AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId,
                 pidAndEpoch.producerId, pidAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
-        return new TransactionalRequest(builder, new AddPartitionsToTransactionCallback(),
-                FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionalRequest.Priority.ADD_PARTITIONS_OR_OFFSETS, isRetry, transactionalId, null);
+        return new AddPartitionsToTxnHandler(builder);
     }
 
-    private TransactionalRequest findCoordinatorRequest(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey, boolean isRetry) {
-        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
-        return new TransactionalRequest(builder, new FindCoordinatorCallback(type, coordinatorKey),
-                null, TransactionalRequest.Priority.FIND_COORDINATOR, isRetry, null, null);
-    }
-
-    private TransactionalRequest endTxnRequest(boolean isCommit, boolean isRetry, TransactionalRequestResult result) {
-        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT);
-        return new TransactionalRequest(builder, new EndTxnCallback(isCommit, result),
-                FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionalRequest.Priority.END_TXN, isRetry, transactionalId, result);
-    }
-
-    private TransactionalRequest addOffsetsToTxnRequest(Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                        String consumerGroupId, boolean isRetry, TransactionalRequestResult result) {
-        AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, consumerGroupId);
-        return new TransactionalRequest(builder, new AddOffsetsToTxnCallback(offsets, consumerGroupId, result),
-                FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionalRequest.Priority.ADD_PARTITIONS_OR_OFFSETS, isRetry, transactionalId, result);
-    }
-
-    private TransactionalRequest txnOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                        String consumerGroupId, boolean isRetry, TransactionalRequestResult result) {
+    private TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult result,
+                                                          Map<TopicPartition, OffsetAndMetadata> offsets,
+                                                          String consumerGroupId) {
         for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
             OffsetAndMetadata offsetAndMetadata = entry.getValue();
-            pendingTxnOffsetCommits.put(entry.getKey(),
-                    new TxnOffsetCommitRequest.CommittedOffset(offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
+            CommittedOffset committedOffset = new CommittedOffset(offsetAndMetadata.offset(), offsetAndMetadata.metadata());
+            pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
         }
-        return txnOffsetCommitRequest(consumerGroupId, isRetry, result);
-    }
-
-    private TransactionalRequest txnOffsetCommitRequest(String consumerGroupId, boolean isRetry, TransactionalRequestResult result) {
         TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME, pendingTxnOffsetCommits);
-        return new TransactionalRequest(builder, new TxnOffsetCommitCallback(consumerGroupId, result),
-                FindCoordinatorRequest.CoordinatorType.GROUP, TransactionalRequest.Priority.ADD_PARTITIONS_OR_OFFSETS, isRetry, consumerGroupId, result);
+                pidAndEpoch.producerId, pidAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+                pendingTxnOffsetCommits);
+        return new TxnOffsetCommitHandler(result, builder);
     }
 
-    private abstract class TransactionalRequestCallBack implements RequestCompletionHandler {
+    abstract class TxnRequestHandler implements  RequestCompletionHandler {
         protected final TransactionalRequestResult result;
+        private boolean isRetry = false;
 
-        TransactionalRequestCallBack(TransactionalRequestResult result) {
+        TxnRequestHandler(TransactionalRequestResult result) {
             this.result = result;
         }
 
+        TxnRequestHandler() {
+            this(new TransactionalRequestResult());
+        }
+
+        void fatal(RuntimeException e) {
+            result.setError(e);
+            transitionTo(State.ERROR, e);
+            result.done();
+        }
+
+        void fenced() {
+            log.error("Producer has become invalid, which typically means another producer with the same " +
+                            "transactional.id has been started: producerId: {}. epoch: {}.",
+                    pidAndEpoch.producerId, pidAndEpoch.epoch);
+            result.setError(Errors.INVALID_PRODUCER_EPOCH.exception());
+            transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception());
+            result.done();
+        }
+
+        void reenqueue() {
+            this.isRetry = true;
+            pendingRequests.add(this);
+        }
+
         @Override
+        @SuppressWarnings("unchecked")
         public void onComplete(ClientResponse response) {
             if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
-                log.error("Detected more than one inflight transactional request. This should never happen.");
-                transitionTo(State.ERROR, new RuntimeException("Detected more than one inflight transactional request. This should never happen."));
-                return;
-            }
-
-            resetInFlightRequestCorrelationId();
-            if (response.wasDisconnected()) {
-                reenqueue();
-            } else if (response.versionMismatch() != null) {
-                if (result != null) {
-                    result.setError(Errors.UNSUPPORTED_VERSION.exception());
-                    result.done();
-                }
-                log.error("Could not execute transactional request because the broker isn't on the right version.");
-                transitionTo(State.ERROR, Errors.UNSUPPORTED_VERSION.exception());
-            } else if (response.hasResponse()) {
-                handleResponse(response.responseBody());
+                fatal(new RuntimeException("Detected more than one in-flight transactional request."));
             } else {
-                if (result != null) {
-                    result.setError(Errors.UNKNOWN.exception());
-                    result.done();
+                clearInFlightRequestCorrelationId();
+                if (response.wasDisconnected()) {
+                    reenqueue();
+                } else if (response.versionMismatch() != null) {
+                    fatal(response.versionMismatch());
+                } else if (response.hasResponse()) {
+                    handleResponse(response.responseBody());
+                } else {
+                    fatal(new KafkaException("Could not execute transactional request for unknown reasons"));
                 }
-                log.error("Could not execute transactional request for unknown reasons");
-                transitionTo(State.ERROR, Errors.UNKNOWN.exception());
             }
         }
 
-        public abstract void handleResponse(AbstractResponse responseBody);
+        boolean needsCoordinator() {
+            return coordinatorType() != null;
+        }
 
-        public abstract void reenqueue();
+        FindCoordinatorRequest.CoordinatorType coordinatorType() {
+            return FindCoordinatorRequest.CoordinatorType.TRANSACTION;
+        }
+
+        String coordinatorKey() {
+            return transactionalId;
+        }
+
+        void setRetry() {
+            this.isRetry = true;
+        }
+
+        boolean isRetry() {
+            return isRetry;
+        }
+
+        boolean isEndTxn() {
+            return false;
+        }
+
+        abstract AbstractRequest.Builder<?> requestBuilder();
+
+        abstract void handleResponse(AbstractResponse responseBody);
+
+        abstract Priority priority();
     }
 
-    private class InitPidCallback extends TransactionalRequestCallBack {
+    private class InitPidHandler extends TxnRequestHandler {
+        private final InitPidRequest.Builder builder;
+
+        private InitPidHandler(InitPidRequest.Builder builder) {
+            this.builder = builder;
+        }
+
+        @Override
+        InitPidRequest.Builder requestBuilder() {
+            return builder;
+        }
 
-        InitPidCallback(TransactionalRequestResult result) {
-           super(result);
+        @Override
+        Priority priority() {
+            return Priority.INIT_PRODUCER_ID;
         }
 
         @Override
-        public void handleResponse(AbstractResponse responseBody) {
-            InitPidResponse initPidResponse = (InitPidResponse) responseBody;
+        public void handleResponse(AbstractResponse response) {
+            InitPidResponse initPidResponse = (InitPidResponse) response;
             Errors error = initPidResponse.error();
             if (error == Errors.NONE) {
-                setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
+                PidAndEpoch pidAndEpoch = new PidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
+                setPidAndEpoch(pidAndEpoch);
                 transitionTo(State.READY);
                 lastError = null;
+                result.done();
             } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
-                needsCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+                lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
             } else {
-                result.setError(error.exception());
-                transitionTo(State.ERROR, error.exception());
+                fatal(new KafkaException("Unexpected error in InitPidResponse; " + error.message()));
             }
+        }
+    }
 
-            if (error == Errors.NONE || !result.isSuccessful())
-                result.done();
+    private class AddPartitionsToTxnHandler extends TxnRequestHandler {
+        private final AddPartitionsToTxnRequest.Builder builder;
+
+        private AddPartitionsToTxnHandler(AddPartitionsToTxnRequest.Builder builder) {
+            this.builder = builder;
         }
 
         @Override
-        public void reenqueue() {
-            pendingTransactionalRequests.add(initPidRequest(true, result));
+        AddPartitionsToTxnRequest.Builder requestBuilder() {
+            return builder;
         }
-    }
-
-    private class AddPartitionsToTransactionCallback extends TransactionalRequestCallBack {
 
-        AddPartitionsToTransactionCallback() {
-            super(null);
+        @Override
+        Priority priority() {
+            return Priority.ADD_PARTITIONS_OR_OFFSETS;
         }
 
         @Override
@@ -667,160 +610,193 @@ public class TransactionManager {
             if (error == Errors.NONE) {
                 partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction);
                 pendingPartitionsToBeAddedToTransaction.clear();
+                result.done();
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
-                needsCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+                lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
-            } else if (error == Errors.INVALID_PID_MAPPING || error == Errors.INVALID_TXN_STATE) {
-                log.error("Seems like the broker has bad transaction state. producerId: {}, error: {}. message: {}",
-                        pidAndEpoch.producerId, error, error.message());
-                transitionTo(State.ERROR, error.exception());
+            } else if (error == Errors.INVALID_PID_MAPPING) {
+                fatal(new KafkaException(error.exception()));
+            } else if (error == Errors.INVALID_TXN_STATE) {
+                fatal(new KafkaException(error.exception()));
             } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                transitionTo(State.FENCED, error.exception());
-                log.error("Epoch has become invalid: producerId: {}. epoch: {}. Message: {}", pidAndEpoch.producerId, pidAndEpoch.epoch, error.message());
+                fenced();
             } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                transitionTo(State.ERROR, error.exception());
-                log.error("No permissions add some partitions to the transaction: {}", error.message());
+                fatal(error.exception());
             } else {
-                transitionTo(State.ERROR, error.exception());
-                log.error("Could not add partitions to transaction due to unknown error: {}", error.message());
+                fatal(new KafkaException("Could not add partitions to transaction due to unknown error: " +
+                        error.message()));
             }
         }
+    }
+
+    private class FindCoordinatorHandler extends TxnRequestHandler {
+        private final FindCoordinatorRequest.Builder builder;
+
+        private FindCoordinatorHandler(FindCoordinatorRequest.Builder builder) {
+            this.builder = builder;
+        }
 
         @Override
-        public void reenqueue() {
-            pendingTransactionalRequests.add(addPartitionsToTransactionRequest(true));
+        FindCoordinatorRequest.Builder requestBuilder() {
+            return builder;
         }
-    }
 
-    private class FindCoordinatorCallback extends TransactionalRequestCallBack {
-        private final FindCoordinatorRequest.CoordinatorType type;
-        private final String coordinatorKey;
+        @Override
+        Priority priority() {
+            return Priority.FIND_COORDINATOR;
+        }
+
+        @Override
+        FindCoordinatorRequest.CoordinatorType coordinatorType() {
+            return null;
+        }
 
-        FindCoordinatorCallback(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
-            super(null);
-            this.type = type;
-            this.coordinatorKey = coordinatorKey;
+        @Override
+        String coordinatorKey() {
+            return null;
         }
+
         @Override
-        public void handleResponse(AbstractResponse responseBody) {
-            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) responseBody;
+        public void handleResponse(AbstractResponse response) {
+            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
             if (findCoordinatorResponse.error() == Errors.NONE) {
                 Node node = findCoordinatorResponse.node();
-                switch (type) {
+                switch (builder.coordinatorType()) {
                     case GROUP:
                         consumerGroupCoordinator = node;
                         break;
                     case TRANSACTION:
                         transactionCoordinator = node;
                 }
+                result.done();
             } else if (findCoordinatorResponse.error() == Errors.COORDINATOR_NOT_AVAILABLE) {
                 reenqueue();
             } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
-                transitionTo(State.ERROR, findCoordinatorResponse.error().exception());
-                log.error("Not authorized to access the group with type {} and key {}. Message: {} ", type,
-                        coordinatorKey, findCoordinatorResponse.error().message());
+                fatal(new GroupAuthorizationException("Not authorized to commit offsets " + builder.coordinatorKey()));
             } else {
-                transitionTo(State.ERROR, findCoordinatorResponse.error().exception());
-                log.error("Could not find a coordinator with type {} for unknown reasons. coordinatorKey: {}", type,
-                        coordinatorKey, findCoordinatorResponse.error().message());
+                fatal(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
+                        "unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(),
+                        findCoordinatorResponse.error().message())));
             }
         }
+    }
+
+    private class EndTxnHandler extends TxnRequestHandler {
+        private final EndTxnRequest.Builder builder;
+
+        private EndTxnHandler(EndTxnRequest.Builder builder) {
+            this.builder = builder;
+        }
 
         @Override
-        public void reenqueue() {
-            pendingTransactionalRequests.add(findCoordinatorRequest(type, coordinatorKey, true));
+        EndTxnRequest.Builder requestBuilder() {
+            return builder;
         }
-    }
 
-    private class EndTxnCallback extends TransactionalRequestCallBack {
-        private final boolean isCommit;
+        @Override
+        Priority priority() {
+            return Priority.END_TXN;
+        }
 
-        EndTxnCallback(boolean isCommit, TransactionalRequestResult result) {
-            super(result);
-            this.isCommit = isCommit;
+        @Override
+        boolean isEndTxn() {
+            return true;
         }
 
         @Override
-        public void handleResponse(AbstractResponse responseBody) {
-            EndTxnResponse endTxnResponse = (EndTxnResponse) responseBody;
+        public void handleResponse(AbstractResponse response) {
+            EndTxnResponse endTxnResponse = (EndTxnResponse) response;
             Errors error = endTxnResponse.error();
             if (error == Errors.NONE) {
                 completeTransaction();
+                result.done();
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
-                needsCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+                lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
             } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                transitionTo(State.FENCED, error.exception());
-                result.setError(error.exception());
+                fenced();
             } else {
-                result.setError(error.exception());
-                transitionTo(State.ERROR, error.exception());
+                fatal(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
             }
+        }
+    }
 
-            if (error == Errors.NONE || !result.isSuccessful())
-                result.done();
+    private class AddOffsetsToTxnHandler extends TxnRequestHandler {
+        private final AddOffsetsToTxnRequest.Builder builder;
+        private final Map<TopicPartition, OffsetAndMetadata> offsets;
+
+        private AddOffsetsToTxnHandler(AddOffsetsToTxnRequest.Builder builder,
+                                       Map<TopicPartition, OffsetAndMetadata> offsets) {
+            this.builder = builder;
+            this.offsets = offsets;
         }
 
         @Override
-        public void reenqueue() {
-            pendingTransactionalRequests.add(endTxnRequest(isCommit, true, result));
+        AddOffsetsToTxnRequest.Builder requestBuilder() {
+            return builder;
         }
-    }
-
-    private class AddOffsetsToTxnCallback extends TransactionalRequestCallBack {
-        String consumerGroupId;
-        Map<TopicPartition, OffsetAndMetadata> offsets;
 
-        AddOffsetsToTxnCallback(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId, TransactionalRequestResult result) {
-            super(result);
-            this.offsets = offsets;
-            this.consumerGroupId = consumerGroupId;
+        @Override
+        Priority priority() {
+            return Priority.ADD_PARTITIONS_OR_OFFSETS;
         }
 
         @Override
-        public void handleResponse(AbstractResponse responseBody) {
-            AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) responseBody;
+        public void handleResponse(AbstractResponse response) {
+            AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
             Errors error = addOffsetsToTxnResponse.error();
             if (error == Errors.NONE) {
-                pendingTransactionalRequests.add(txnOffsetCommitRequest(offsets, consumerGroupId, false, result));
+                // note the result is not completed until the TxnOffsetCommit returns
+                pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
-                needsCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+                lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
             } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                transitionTo(State.FENCED, error.exception());
-                result.setError(error.exception());
+                fenced();
             } else {
-                transitionTo(State.ERROR, error.exception());
-                result.setError(error.exception());
+                fatal(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
             }
+        }
+    }
 
-            if (!result.isSuccessful())
-                result.done();
+    private class TxnOffsetCommitHandler extends TxnRequestHandler {
+        private final TxnOffsetCommitRequest.Builder builder;
+
+        private TxnOffsetCommitHandler(TransactionalRequestResult result,
+                                       TxnOffsetCommitRequest.Builder builder) {
+            super(result);
+            this.builder = builder;
         }
 
         @Override
-        public void reenqueue() {
-            pendingTransactionalRequests.add(addOffsetsToTxnRequest(offsets, consumerGroupId, true, result));
+        TxnOffsetCommitRequest.Builder requestBuilder() {
+            return builder;
         }
-    }
 
-    private class TxnOffsetCommitCallback extends TransactionalRequestCallBack {
-        private final String consumerGroupId;
+        @Override
+        Priority priority() {
+            return Priority.ADD_PARTITIONS_OR_OFFSETS;
+        }
 
-        TxnOffsetCommitCallback(String consumerGroupId, TransactionalRequestResult result) {
-            super(result);
-            this.consumerGroupId = consumerGroupId;
+        @Override
+        FindCoordinatorRequest.CoordinatorType coordinatorType() {
+            return FindCoordinatorRequest.CoordinatorType.GROUP;
+        }
+
+        @Override
+        String coordinatorKey() {
+            return builder.consumerGroupId();
         }
 
         @Override
-        public void handleResponse(AbstractResponse responseBody) {
-            TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse) responseBody;
+        public void handleResponse(AbstractResponse response) {
+            TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse) response;
             boolean coordinatorReloaded = false;
             boolean hadFailure = false;
             for (Map.Entry<TopicPartition, Errors> entry : txnOffsetCommitResponse.errors().entrySet()) {
@@ -832,16 +808,14 @@ public class TransactionManager {
                     hadFailure = true;
                     if (!coordinatorReloaded) {
                         coordinatorReloaded = true;
-                        needsCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId);
+                        lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, builder.consumerGroupId());
                     }
                 } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                    transitionTo(State.FENCED, error.exception());
-                    result.setError(error.exception());
-                    break;
+                    fenced();
+                    return;
                 } else {
-                    result.setError(error.exception());
-                    transitionTo(State.ERROR, error.exception());
-                    break;
+                    fatal(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + error.message()));
+                    return;
                 }
             }
 
@@ -854,13 +828,8 @@ public class TransactionManager {
 
             // retry the commits which failed with a retriable error.
             if (!pendingTxnOffsetCommits.isEmpty())
-                pendingTransactionalRequests.add(txnOffsetCommitRequest(consumerGroupId, true, result));
-
-        }
-
-        @Override
-        public void reenqueue() {
-            pendingTransactionalRequests.add(txnOffsetCommitRequest(consumerGroupId, true, result));
+                reenqueue();
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
index 88f146b..840cb1e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
@@ -21,9 +21,19 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public final class TransactionalRequestResult {
-    private final CountDownLatch latch = new CountDownLatch(1);
+    static final TransactionalRequestResult COMPLETE = new TransactionalRequestResult(new CountDownLatch(0));
+
+    private final CountDownLatch latch;
     private RuntimeException error = null;
 
+    public TransactionalRequestResult() {
+        this(new CountDownLatch(1));
+    }
+
+    private TransactionalRequestResult(CountDownLatch latch) {
+        this.latch = latch;
+    }
+
     public void setError(RuntimeException error) {
         this.error = error;
     }
@@ -43,6 +53,9 @@ public final class TransactionalRequestResult {
                 // Keep waiting until done, we have no other option for these transactional requests.
             }
         }
+
+        if (!isSuccessful())
+            throw error();
     }
 
     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 733e806..b017242 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -42,6 +42,10 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
             this.consumerGroupId = consumerGroupId;
         }
 
+        public String consumerGroupId() {
+            return consumerGroupId;
+        }
+
         @Override
         public AddOffsetsToTxnRequest build(short version) {
             return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index b2eaf63..fbc7fa2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -49,6 +49,14 @@ public class FindCoordinatorRequest extends AbstractRequest {
             return new FindCoordinatorRequest(coordinatorType, coordinatorKey, version);
         }
 
+        public String coordinatorKey() {
+            return coordinatorKey;
+        }
+
+        public CoordinatorType coordinatorType() {
+            return coordinatorType;
+        }
+
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index cca8875..8778b49 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -55,6 +55,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
             this.offsets = offsets;
         }
 
+        public String consumerGroupId() {
+            return consumerGroupId;
+        }
+
         @Override
         public TxnOffsetCommitRequest build(short version) {
             return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, retentionTimeMs, offsets);

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 2017ef9..c01a375 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -395,7 +395,7 @@ public class SenderTest {
     public void testSequenceNumberIncrement() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(producerId, (short) 0);
+        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 
@@ -448,7 +448,7 @@ public class SenderTest {
     public void testAbortRetryWhenPidChanges() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(producerId, (short) 0);
+        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 
@@ -480,7 +480,7 @@ public class SenderTest {
         assertEquals(0, client.inFlightRequestCount());
         assertFalse("Client ready status should be false", client.isReady(node, 0L));
 
-        transactionManager.setPidAndEpoch(producerId + 1, (short) 0);
+        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId + 1, (short) 0));
         sender.run(time.milliseconds()); // receive error
         sender.run(time.milliseconds()); // reconnect
         sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
@@ -497,7 +497,7 @@ public class SenderTest {
     public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(producerId, (short) 0);
+        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1f4d2/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 1ab86aa..8e46eb7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -156,6 +156,7 @@ public class TransactionManagerTest {
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
         sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
         assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
@@ -185,7 +186,7 @@ public class TransactionManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(tp1, new OffsetAndMetadata(1));
         final String consumerGroupId = "myconsumergroup";
-        FutureTransactionalResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+        TransactionalRequestResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
 
         assertFalse(transactionManager.hasPendingOffsetCommits());
 
@@ -203,7 +204,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // Send AddOffsetsRequest
         assertTrue(transactionManager.hasPendingOffsetCommits());  // We should now have created and queued the offset commit request.
-        assertFalse(addOffsetsResult.isDone());
+        assertFalse(addOffsetsResult.isCompleted()); // the result doesn't complete until TxnOffsetCommit returns
 
         Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
         txnOffsetCommitResponse.put(tp1, Errors.NONE);
@@ -230,7 +231,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // send TxnOffsetCommitRequest commit.
 
         assertFalse(transactionManager.hasPendingOffsetCommits());
-        assertTrue(addOffsetsResult.isDone());  // We should only be done after both RPCs complete.
+        assertTrue(addOffsetsResult.isCompleted());  // We should only be done after both RPCs complete.
 
         transactionManager.beginCommittingTransaction();
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
@@ -252,6 +253,7 @@ public class TransactionManagerTest {
 
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
         assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
     }
 
@@ -262,26 +264,27 @@ public class TransactionManagerTest {
         // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        FutureTransactionalResult initPidResult = transactionManager.initializeTransactions();
+        TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
         assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NOT_COORDINATOR, false, pid, epoch);
         sender.run(time.milliseconds());  // send pid, get not coordinator. Should resend the FindCoordinator and InitPid requests
 
         assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-        assertFalse(initPidResult.isDone());
+        assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasPid());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());
         assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-        assertFalse(initPidResult.isDone());
+        assertFalse(initPidResult.isCompleted());
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
         sender.run(time.milliseconds());  // get pid and epoch
 
-        assertTrue(initPidResult.isDone()); // The future should only return after the second round of retries succeed.
+        assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
         assertTrue(transactionManager.hasPid());
         assertEquals(pid, transactionManager.pidAndEpoch().producerId);
         assertEquals(epoch, transactionManager.pidAndEpoch().epoch);
@@ -298,6 +301,7 @@ public class TransactionManagerTest {
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
         sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
         assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
@@ -314,7 +318,7 @@ public class TransactionManagerTest {
 
         assertFalse(responseFuture.isDone());
 
-        FutureTransactionalResult commitResult = transactionManager.beginCommittingTransaction();
+        TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
 
         // we have an append, an add partitions request, and now also an endtxn.
         // The order should be:
@@ -327,19 +331,19 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // AddPartitions.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
         assertFalse(responseFuture.isDone());
-        assertFalse(commitResult.isDone());
+        assertFalse(commitResult.isCompleted());
 
         prepareProduceResponse(Errors.NONE, pid, epoch);
         sender.run(time.milliseconds());  // Produce.
         assertTrue(responseFuture.isDone());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
-        assertFalse(commitResult.isDone());
+        assertFalse(commitResult.isCompleted());
         assertTrue(transactionManager.isInTransaction());
         assertTrue(transactionManager.isCompletingTransaction());
 
         sender.run(time.milliseconds());
-        assertTrue(commitResult.isDone());
+        assertTrue(commitResult.isCompleted());
         assertFalse(transactionManager.isInTransaction());
     }
 
@@ -354,6 +358,7 @@ public class TransactionManagerTest {
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
         sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
         assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
@@ -391,7 +396,7 @@ public class TransactionManagerTest {
         assertFalse(responseFuture.isDone());
         assertFalse(secondResponseFuture.isDone());
 
-        // The second add partitionsh should go out here.
+        // The second add partitions should go out here.
         sender.run(time.milliseconds());  // send second add partitions request
         assertTrue(transactionManager.transactionContainsPartition(tp1));
 
@@ -416,6 +421,7 @@ public class TransactionManagerTest {
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
         sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
         assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
@@ -450,6 +456,7 @@ public class TransactionManagerTest {
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
         sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
         assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
@@ -463,32 +470,32 @@ public class TransactionManagerTest {
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
 
-        FutureTransactionalResult commitResult = transactionManager.beginCommittingTransaction();
+        TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
 
         sender.run(time.milliseconds());  // Send AddPartitionsRequest
-        assertFalse(commitResult.isDone());
+        assertFalse(commitResult.isCompleted());
 
         sender.run(time.milliseconds());  // Send Produce Request, returns OutOfOrderSequenceException.
         sender.run(time.milliseconds());  // try to commit.
-        assertTrue(commitResult.isDone());  // commit should be cancelled with exception without being sent.
+        assertTrue(commitResult.isCompleted());  // commit should be cancelled with exception without being sent.
 
         try {
-            commitResult.get();
+            commitResult.await();
             fail();  // the get() must throw an exception.
         } catch (RuntimeException e) {
             assertTrue(e instanceof KafkaException);
         }
 
         // Commit is not allowed, so let's abort and try again.
-        FutureTransactionalResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
         sender.run(time.milliseconds());  // Send abort request. It is valid to transition from ERROR to ABORT
 
-        assertTrue(abortResult.isDone());
-        assertTrue(abortResult.get().isSuccessful());
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
         assertTrue(transactionManager.isReadyForTransaction());  // make sure we are ready for a transaction now.
     }
 


Mime
View raw message