kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [5/5] kafka git commit: KAFKA-5259; TransactionalId auth implies ProducerId auth
Date Wed, 24 May 2017 22:29:03 GMT
KAFKA-5259; TransactionalId auth implies ProducerId auth

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #3075 from hachikuji/KAFKA-5259-FIXED

(cherry picked from commit 38f6cae9e879baa35c5dbc5829bf09ecd59930c2)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9a21bf20
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9a21bf20
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9a21bf20

Branch: refs/heads/0.11.0
Commit: 9a21bf20b623b790c9910813e373d2d61fe84c2c
Parents: 7cb57dc
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed May 24 15:26:46 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed May 24 15:28:55 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/ClientResponse.java    |    7 +-
 .../kafka/clients/admin/AclOperation.java       |    7 +-
 .../clients/consumer/internals/Fetcher.java     |    2 +-
 .../kafka/clients/producer/KafkaProducer.java   |   35 +-
 .../clients/producer/internals/Sender.java      |  169 +-
 .../producer/internals/TransactionManager.java  |  268 ++--
 .../ProducerIdAuthorizationException.java       |   23 -
 .../TransactionalIdAuthorizationException.java  |    2 +-
 .../apache/kafka/common/protocol/Errors.java    |   13 +-
 .../apache/kafka/common/protocol/Protocol.java  |    3 +
 .../common/requests/AddOffsetsToTxnRequest.java |   11 +
 .../requests/AddOffsetsToTxnResponse.java       |    5 +-
 .../requests/AddPartitionsToTxnRequest.java     |   11 +
 .../requests/AddPartitionsToTxnResponse.java    |    3 +-
 .../kafka/common/requests/EndTxnRequest.java    |   11 +
 .../kafka/common/requests/EndTxnResponse.java   |    3 +-
 .../common/requests/InitProducerIdResponse.java |   12 +-
 .../kafka/common/requests/ProduceRequest.java   |    5 +-
 .../kafka/common/requests/ProduceResponse.java  |    3 +
 .../common/requests/TxnOffsetCommitRequest.java |   31 +-
 .../requests/TxnOffsetCommitResponse.java       |    1 +
 .../requests/WriteTxnMarkersResponse.java       |    1 +
 .../kafka/clients/admin/AclOperationTest.java   |    3 +-
 .../clients/producer/internals/SenderTest.java  |  183 +--
 .../internals/TransactionManagerTest.java       |  445 ++++--
 .../common/requests/RequestResponseTest.java    |   10 +-
 .../src/main/scala/kafka/admin/AclCommand.scala |   54 +-
 .../kafka/coordinator/group/GroupMetadata.scala |   43 +-
 .../group/GroupMetadataManager.scala            |    5 +-
 .../coordinator/group/MemberMetadata.scala      |   14 +-
 .../scala/kafka/security/auth/Operation.scala   |    6 +-
 .../scala/kafka/security/auth/Resource.scala    |    3 +-
 .../kafka/security/auth/ResourceType.scala      |   16 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  316 ++--
 .../kafka/api/AuthorizerIntegrationTest.scala   |  292 +++-
 .../kafka/api/TransactionsBounceTest.scala      |   31 +-
 .../kafka/api/TransactionsTest.scala            |    7 +-
 .../scala/unit/kafka/admin/AclCommandTest.scala |   36 +-
 .../group/GroupCoordinatorResponseTest.scala    | 1492 ------------------
 .../group/GroupCoordinatorTest.scala            | 1492 ++++++++++++++++++
 .../coordinator/group/GroupMetadataTest.scala   |    2 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |    9 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   10 +-
 43 files changed, 2771 insertions(+), 2324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
index 715eae7..0ff30e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 
@@ -31,7 +32,7 @@ public class ClientResponse {
     private final long receivedTimeMs;
     private final long latencyMs;
     private final boolean disconnected;
-    private final RuntimeException versionMismatch;
+    private final UnsupportedVersionException versionMismatch;
     private final AbstractResponse responseBody;
 
     /**
@@ -51,7 +52,7 @@ public class ClientResponse {
                           long createdTimeMs,
                           long receivedTimeMs,
                           boolean disconnected,
-                          RuntimeException versionMismatch,
+                          UnsupportedVersionException versionMismatch,
                           AbstractResponse responseBody) {
         this.requestHeader = requestHeader;
         this.callback = callback;
@@ -71,7 +72,7 @@ public class ClientResponse {
         return disconnected;
     }
 
-    public RuntimeException versionMismatch() {
+    public UnsupportedVersionException versionMismatch() {
         return versionMismatch;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
index 062e5e3..0c3ff50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
@@ -83,7 +83,12 @@ public enum AclOperation {
     /**
      * ALTER_CONFIGS operation.
      */
-    ALTER_CONFIGS((byte) 11);
+    ALTER_CONFIGS((byte) 11),
+
+    /**
+     * IDEMPOTENT_WRITE operation.
+     */
+    IDEMPOTENT_WRITE((byte) 12);
 
     private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 509993f..6917a1d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -416,7 +416,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             }
             // we might lose the assignment while fetching the offset, so check it is still active
             if (subscriptions.isAssigned(partition)) {
-                log.debug("Resetting offset for partition {} to {} offset.", partition, offsetData.offset);
+                log.debug("Resetting offset for partition {} to offset {}.", partition, offsetData.offset);
                 this.subscriptions.seek(partition, offsetData.offset);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ac0169a..c11ecc7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -51,7 +51,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.RecordBatch;
@@ -607,7 +606,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * Implementation of asynchronously send a record to a topic.
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
-        ensureProperTransactionalState();
+        if (transactionManager != null)
+            ensureProperTransactionalState();
+
         TopicPartition tp = null;
         try {
             // first make sure the metadata for the topic is available
@@ -642,9 +643,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
             // producer callback will make sure to call both 'callback' and interceptor callback
-            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, transactionManager);
+            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
 
-            if (transactionManager != null)
+            if (transactionManager != null && transactionManager.isTransactional())
                 transactionManager.maybeAddPartitionToTransaction(tp);
 
             RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
@@ -690,27 +691,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     }
 
     private void ensureProperTransactionalState() {
-        if (transactionManager == null)
-            return;
-
         if (transactionManager.isTransactional() && !transactionManager.hasProducerId())
-            throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
-
-        if (transactionManager.isFenced())
-            throw Errors.INVALID_PRODUCER_EPOCH.exception();
+            throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
+                    "when transactions are enabled.");
 
         if (transactionManager.isInErrorState()) {
-            String errorMessage =
-                    "Cannot perform send because at least one previous transactional or idempotent request has failed with errors.";
             Exception lastError = transactionManager.lastError();
-            if (lastError != null)
-                throw new KafkaException(errorMessage, lastError);
-            else
-                throw new KafkaException(errorMessage);
+            throw new KafkaException("Cannot perform send because at least one previous transactional or " +
+                    "idempotent request has failed with errors.", lastError);
         }
         if (transactionManager.isCompletingTransaction())
             throw new IllegalStateException("Cannot call send while a commit or abort is in progress.");
-
     }
 
     private void setReadOnly(Headers headers) {
@@ -1013,14 +1004,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
         private final TopicPartition tp;
-        private final TransactionManager transactionManager;
 
-        public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors,
-                                   TopicPartition tp, TransactionManager transactionManager) {
+        private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.tp = tp;
-            this.transactionManager = transactionManager;
         }
 
         public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -1034,9 +1022,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
-
-            if (exception != null && transactionManager != null)
-                transactionManager.setError(exception);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 4c3b99d..116a1c5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -24,15 +24,18 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -189,28 +192,34 @@ public class Sender implements Runnable {
      * @param now The current POSIX time in milliseconds
      */
     void run(long now) {
-        long pollTimeout = retryBackoffMs;
-        if (!maybeSendTransactionalRequest(now)) {
-            pollTimeout = sendProducerData(now);
+        if (transactionManager != null) {
+            if (!transactionManager.isTransactional()) {
+                // this is an idempotent producer, so make sure we have a producer id
+                maybeWaitForProducerId();
+            } else if (transactionManager.hasInflightRequest() || maybeSendTransactionalRequest(now)) {
+                // as long as there are outstanding transactional requests, we simply wait for them to return
+                client.poll(retryBackoffMs, now);
+                return;
+            }
+
+            // do not continue sending if the transaction manager is in a failed state or if there
+            // is no producer id (for the idempotent case).
+            if (transactionManager.isInErrorState() || !transactionManager.hasProducerId()) {
+                RuntimeException lastError = transactionManager.lastError();
+                if (lastError != null)
+                    maybeAbortBatches(lastError);
+                client.poll(retryBackoffMs, now);
+                return;
+            }
         }
 
+        long pollTimeout = sendProducerData(now);
         log.trace("waiting {}ms in poll", pollTimeout);
-        this.client.poll(pollTimeout, now);
+        client.poll(pollTimeout, now);
     }
 
-
     private long sendProducerData(long now) {
         Cluster cluster = metadata.fetch();
-        maybeWaitForProducerId();
-
-        if (transactionManager != null && transactionManager.isInErrorState()) {
-            final KafkaException exception = transactionManager.lastError() instanceof KafkaException
-                    ? (KafkaException) transactionManager.lastError()
-                    : new KafkaException(transactionManager.lastError());
-            log.error("aborting producer batches because the transaction manager is in an error state.", exception);
-            this.accumulator.abortBatches(exception);
-            return Long.MAX_VALUE;
-        }
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
@@ -286,22 +295,13 @@ public class Sender implements Runnable {
     }
 
     private boolean maybeSendTransactionalRequest(long now) {
-        if (transactionManager == null || !transactionManager.isTransactional())
-            return false;
-
-        if (transactionManager.hasInflightRequest()) {
-            log.trace("TransactionalId: {} -- There is already an inflight transactional request. Going to wait for the response.",
+        TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
+        if (nextRequestHandler == null) {
+            log.trace("TransactionalId: {} -- There are no pending transactional requests to send",
                     transactionManager.transactionalId());
-            return true;
-        }
-
-        if (!transactionManager.hasPendingTransactionalRequests()) {
-            log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionManager.transactionalId());
             return false;
         }
 
-        TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
-
         if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
             if (!accumulator.flushInProgress())
                 accumulator.beginFlush();
@@ -311,15 +311,11 @@ public class Sender implements Runnable {
             return false;
         }
 
-        if (transactionManager.maybeTerminateRequestWithError(nextRequestHandler)) {
-            log.trace("TransactionalId: {} -- Not sending a transactional request because we are in an error state",
-                    transactionManager.transactionalId());
-            return false;
-        }
-
-        Node targetNode = null;
+        log.debug("TransactionalId: {} -- Sending transactional request {}", transactionManager.transactionalId(),
+                nextRequestHandler.requestBuilder());
 
-        while (targetNode == null) {
+        while (true) {
+            Node targetNode = null;
             try {
                 if (nextRequestHandler.needsCoordinator()) {
                     targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());
@@ -340,8 +336,8 @@ public class Sender implements Runnable {
                                 transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder());
                         time.sleep(retryBackoffMs);
                     }
-                    ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequestHandler.requestBuilder(),
-                            now, true, nextRequestHandler);
+                    ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
+                            nextRequestHandler.requestBuilder(), now, true, nextRequestHandler);
                     transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
                     log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(),
                             nextRequestHandler.requestBuilder(), clientRequest.destination());
@@ -349,9 +345,9 @@ public class Sender implements Runnable {
                     return true;
                 }
             } catch (IOException e) {
-                targetNode = null;
-                log.warn("TransactionalId: " + transactionManager.transactionalId() + " -- Got an exception when trying " +
-                        "to find a node to send transactional request " + nextRequestHandler.requestBuilder() + ". Going to back off and retry", e);
+                log.debug("TransactionalId: {} -- Disconnect from {} while trying to send transactional " +
+                                "request {}. Going to back off and retry", transactionManager.transactionalId(),
+                        targetNode, nextRequestHandler.requestBuilder());
             }
             log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.",
                     transactionManager.transactionalId(), retryBackoffMs);
@@ -364,6 +360,13 @@ public class Sender implements Runnable {
         return true;
     }
 
+    private void maybeAbortBatches(RuntimeException exception) {
+        if (accumulator.hasUnflushedBatches()) {
+            log.error("Aborting producer batches due to fatal error", exception);
+            accumulator.abortBatches(exception);
+        }
+    }
+
     /**
      * Start closing the sender (won't actually complete until all data is sent out)
      */
@@ -383,7 +386,7 @@ public class Sender implements Runnable {
         initiateClose();
     }
 
-    private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException {
+    private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
         String nodeId = node.idString();
         InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
         ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null);
@@ -399,43 +402,37 @@ public class Sender implements Runnable {
     }
 
     private void maybeWaitForProducerId() {
-        // If this is a transactional producer, the producer id will be received when recovering transactions in the
-        // initTransactions() method of the producer.
-        if (transactionManager == null || transactionManager.isTransactional())
-            return;
-
         while (!transactionManager.hasProducerId() && !transactionManager.isInErrorState()) {
             try {
                 Node node = awaitLeastLoadedNodeReady(requestTimeout);
                 if (node != null) {
-                    ClientResponse response = sendAndAwaitInitPidRequest(node);
-
-                    if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) {
-                        InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
-                        Exception exception = initProducerIdResponse.error().exception();
-                        if (exception != null && !(exception instanceof  RetriableException)) {
-                            transactionManager.setError(exception);
-                            return;
-                        }
+                    ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
+                    InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
+                    Errors error = initProducerIdResponse.error();
+                    if (error == Errors.NONE) {
                         ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
                                 initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
                         transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
+                    } else if (error.exception() instanceof RetriableException) {
+                        log.debug("Retriable error from InitProducerId response", error.message());
                     } else {
-                        log.error("Received an unexpected response type for an InitProducerIdRequest from {}. " +
-                                "We will back off and try again.", node);
+                        transactionManager.transitionToFatalError(error.exception());
+                        break;
                     }
                 } else {
                     log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
                             "We will back off and try again.");
                 }
-            } catch (Exception e) {
-                log.warn("Received an exception while trying to get a producer id. Will back off and retry.", e);
+            } catch (UnsupportedVersionException e) {
+                transactionManager.transitionToFatalError(e);
+                break;
+            } catch (IOException e) {
+                log.debug("Broker {} disconnected while awaiting InitProducerId response", e);
             }
             log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }
-
     }
 
     /**
@@ -507,9 +504,9 @@ public class Sender implements Runnable {
                         error);
                 if (transactionManager == null) {
                     reenqueueBatch(batch, now);
-                } else if (transactionManager.producerIdAndEpoch().producerId == batch.producerId() &&
-                        transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) {
-                    // If idempotence is enabled only retry the request if the current producer id is the same as the producer id of the batch.
+                } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
+                    // If idempotence is enabled only retry the request if the current producer id is the same as
+                    // the producer id of the batch.
                     log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
                             transactionManager.sequenceNumber(batch.topicPartition));
                     reenqueueBatch(batch, now);
@@ -523,12 +520,10 @@ public class Sender implements Runnable {
                 final RuntimeException exception;
                 if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                     exception = new TopicAuthorizationException(batch.topicPartition.topic());
+                else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+                    exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
                 else
                     exception = error.exception();
-                if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.producerIdAndEpoch().producerId)
-                    log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " +
-                                    "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
-                            correlationId, batch.topicPartition, response.baseOffset);
                 // tell the user the result of their request
                 failBatch(batch, response, exception);
                 this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
@@ -543,12 +538,6 @@ public class Sender implements Runnable {
         } else {
             completeBatch(batch, response);
 
-            if (transactionManager != null && transactionManager.producerIdAndEpoch().producerId == batch.producerId()
-                    && transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) {
-                transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
-                log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
-                        transactionManager.sequenceNumber(batch.topicPartition));
-            }
         }
 
         // Unmute the completed partition.
@@ -562,18 +551,38 @@ public class Sender implements Runnable {
     }
 
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
+        if (transactionManager != null && transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
+            transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
+            log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
+                    transactionManager.sequenceNumber(batch.topicPartition));
+        }
+
         batch.done(response.baseOffset, response.logAppendTime, null);
         this.accumulator.deallocate(batch);
     }
 
     private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
-        if (transactionManager != null && !transactionManager.isTransactional()
-                && batch.producerId() == transactionManager.producerIdAndEpoch().producerId) {
-            // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
-            // about the previously committed message. Note that this will discard the producer id and sequence
-            // numbers for all existing partitions.
-            transactionManager.resetProducerId();
+        if (transactionManager != null) {
+            if (exception instanceof OutOfOrderSequenceException
+                    && !transactionManager.isTransactional()
+                    && transactionManager.hasProducerId(batch.producerId())) {
+                log.error("The broker received an out of order sequence number for topic-partition " +
+                                "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
+                        batch.topicPartition, response.baseOffset);
+
+                // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
+                // about the previously committed message. Note that this will discard the producer id and sequence
+                // numbers for all existing partitions.
+                transactionManager.resetProducerId();
+            } else if (exception instanceof ClusterAuthorizationException
+                    || exception instanceof TransactionalIdAuthorizationException
+                    || exception instanceof ProducerFencedException) {
+                transactionManager.transitionToFatalError(exception);
+            } else if (transactionManager.isTransactional()) {
+                transactionManager.transitionToAbortableError(exception);
+            }
         }
+
         batch.done(response.baseOffset, response.logAppendTime, exception);
         this.accumulator.deallocate(batch);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index d84a88e..d674697 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -77,7 +77,7 @@ public class TransactionManager {
     private Node consumerGroupCoordinator;
 
     private volatile State currentState = State.UNINITIALIZED;
-    private volatile Exception lastError = null;
+    private volatile RuntimeException lastError = null;
     private volatile ProducerIdAndEpoch producerIdAndEpoch;
 
     private enum State {
@@ -87,32 +87,34 @@ public class TransactionManager {
         IN_TRANSACTION,
         COMMITTING_TRANSACTION,
         ABORTING_TRANSACTION,
-        FENCED,
-        ERROR;
+        ABORTABLE_ERROR,
+        FATAL_ERROR;
 
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case INITIALIZING:
-                    return source == UNINITIALIZED || source == ERROR;
+                    return source == UNINITIALIZED;
                 case READY:
                     return source == INITIALIZING || source == COMMITTING_TRANSACTION
-                            || source == ABORTING_TRANSACTION || source == ERROR;
+                            || source == ABORTING_TRANSACTION || source == ABORTABLE_ERROR;
                 case IN_TRANSACTION:
                     return source == READY;
                 case COMMITTING_TRANSACTION:
                     return source == IN_TRANSACTION;
                 case ABORTING_TRANSACTION:
-                    return source == IN_TRANSACTION || source == ERROR;
+                    return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
+                case ABORTABLE_ERROR:
+                    return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
+                case FATAL_ERROR:
                 default:
-                    // We can transition to FENCED or ERROR unconditionally.
-                    // FENCED is never a valid starting state for any transition. So the only option is to close the
+                    // We can transition to FATAL_ERROR unconditionally.
+                    // FATAL_ERROR is never a valid starting state for any transition. So the only option is to close the
                     // producer or do purely non transactional requests.
                     return true;
             }
         }
     }
 
-
     // We use the priority to determine the order in which requests need to be sent out. For instance, if we have
     // a pending FindCoordinator request, that must always go first. Next, If we need a producer id, that must go second.
     // The endTxn request must always go last.
@@ -149,7 +151,7 @@ public class TransactionManager {
     }
 
     TransactionManager() {
-        this("", 0);
+        this(null, 0);
     }
 
     public synchronized TransactionalRequestResult initializeTransactions() {
@@ -178,8 +180,8 @@ public class TransactionManager {
 
     public synchronized TransactionalRequestResult beginAbortingTransaction() {
         ensureTransactional();
-        if (isFenced())
-            throw Errors.INVALID_PRODUCER_EPOCH.exception();
+        if (currentState != State.ABORTABLE_ERROR)
+            maybeFailWithError();
         transitionTo(State.ABORTING_TRANSACTION);
         return beginCompletingTransaction(false);
     }
@@ -213,12 +215,16 @@ public class TransactionManager {
     }
 
     public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
-        if (!isInTransaction() || partitionsInTransaction.contains(topicPartition))
+        if (!isInTransaction())
+            throw new IllegalArgumentException("Cannot add partitions to a transaction in state " + currentState);
+
+        if (partitionsInTransaction.contains(topicPartition))
             return;
+
         newPartitionsToBeAddedToTransaction.add(topicPartition);
     }
 
-    public Exception lastError() {
+    public RuntimeException lastError() {
         return lastError;
     }
 
@@ -231,11 +237,7 @@ public class TransactionManager {
     }
 
     public boolean isTransactional() {
-        return transactionalId != null && !transactionalId.isEmpty();
-    }
-
-    public boolean isFenced() {
-        return currentState == State.FENCED;
+        return transactionalId != null;
     }
 
     public boolean isCompletingTransaction() {
@@ -247,31 +249,15 @@ public class TransactionManager {
     }
 
     public boolean isInErrorState() {
-        return currentState == State.ERROR || currentState == State.FENCED;
-    }
-
-    public synchronized void setError(Exception exception) {
-        if (exception instanceof ProducerFencedException)
-            transitionTo(State.FENCED, exception);
-        else
-            transitionTo(State.ERROR, exception);
-    }
-
-    boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
-        if (isInErrorState() && requestHandler.isEndTxn()) {
-            // We shouldn't terminate abort requests from error states.
-            EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
-            if (endTxnHandler.builder.result() == TransactionResult.ABORT)
-                return false;
-            String errorMessage = "Cannot commit transaction because at least one previous transactional request " +
-                    "was not completed successfully.";
-            if (lastError != null)
-                requestHandler.fatal(new KafkaException(errorMessage, lastError));
-            else
-                requestHandler.fatal(new KafkaException(errorMessage));
-            return true;
-        }
-        return false;
+        return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR;
+    }
+
+    public synchronized void transitionToAbortableError(RuntimeException exception) {
+        transitionTo(State.ABORTABLE_ERROR, exception);
+    }
+
+    public synchronized void transitionToFatalError(RuntimeException exception) {
+        transitionTo(State.FATAL_ERROR, exception);
     }
 
     /**
@@ -284,6 +270,15 @@ public class TransactionManager {
         return producerIdAndEpoch;
     }
 
+    boolean hasProducerId(long producerId) {
+        return producerIdAndEpoch.producerId == producerId;
+    }
+
+    boolean hasProducerIdAndEpoch(long producerId, short producerEpoch) {
+        ProducerIdAndEpoch idAndEpoch = this.producerIdAndEpoch;
+        return idAndEpoch.producerId == producerId && idAndEpoch.epoch == producerEpoch;
+    }
+
     /**
      * Set the producer id and epoch atomically.
      */
@@ -337,26 +332,26 @@ public class TransactionManager {
         sequenceNumbers.put(topicPartition, currentSequenceNumber);
     }
 
-    boolean hasPendingTransactionalRequests() {
-        return !(pendingRequests.isEmpty() && newPartitionsToBeAddedToTransaction.isEmpty());
-    }
-
-    TxnRequestHandler nextRequestHandler() {
-        if (!hasPendingTransactionalRequests())
-            return null;
-
+    synchronized TxnRequestHandler nextRequestHandler() {
         if (!newPartitionsToBeAddedToTransaction.isEmpty())
             pendingRequests.add(addPartitionsToTransactionHandler());
 
-        return pendingRequests.poll();
+        TxnRequestHandler nextRequestHandler = pendingRequests.poll();
+        if (nextRequestHandler != null && maybeTerminateRequestWithError(nextRequestHandler)) {
+            log.trace("TransactionalId: {} -- Not sending transactional request {} because we are in an error state",
+                    transactionalId, nextRequestHandler.requestBuilder());
+            return null;
+        }
+
+        return nextRequestHandler;
     }
 
-    void retry(TxnRequestHandler request) {
+    synchronized void retry(TxnRequestHandler request) {
         request.setRetry();
         pendingRequests.add(request);
     }
 
-    void reenqueue(TxnRequestHandler request) {
+    synchronized void reenqueue(TxnRequestHandler request) {
         pendingRequests.add(request);
     }
 
@@ -406,15 +401,21 @@ public class TransactionManager {
         transitionTo(target, null);
     }
 
-    private synchronized void transitionTo(State target, Exception error) {
-        if (currentState.isTransitionValid(currentState, target)) {
-            currentState = target;
-            if (target == State.ERROR && error != null)
-                lastError = error;
-        } else {
+    private synchronized void transitionTo(State target, RuntimeException error) {
+        if (!currentState.isTransitionValid(currentState, target))
             throw new KafkaException("Invalid transition attempted from state " + currentState.name() +
                     " to state " + target.name());
+
+        if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
+            if (error == null)
+                throw new IllegalArgumentException("Cannot transition to " + target + " with an null exception");
+            lastError = error;
+        } else {
+            lastError = null;
         }
+
+        log.debug("TransactionalId {} -- Transition from state {} to {}", transactionalId, currentState, target);
+        currentState = target;
     }
 
     private void ensureTransactional() {
@@ -423,15 +424,23 @@ public class TransactionManager {
     }
 
     private void maybeFailWithError() {
-        if (isFenced())
-            throw Errors.INVALID_PRODUCER_EPOCH.exception();
+        if (isInErrorState())
+            throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
+    }
+
+    private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
         if (isInErrorState()) {
-            String errorMessage = "Cannot execute transactional method because we are in an error state.";
-            if (lastError != null)
-                throw new KafkaException(errorMessage, lastError);
-            else
-                throw new KafkaException(errorMessage);
+            if (requestHandler instanceof EndTxnHandler) {
+                // we allow abort requests to break out of the error state. The state and the last error
+                // will be cleared when the request returns
+                EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
+                if (endTxnHandler.builder.result() == TransactionResult.ABORT)
+                    return false;
+            }
+            requestHandler.fail(lastError);
+            return true;
         }
+        return false;
     }
 
     private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
@@ -443,12 +452,11 @@ public class TransactionManager {
                 transactionCoordinator = null;
                 break;
             default:
-                throw new IllegalStateException("Got an invalid coordinator type: " + type);
+                throw new IllegalStateException("Invalid coordinator type: " + type);
         }
 
         FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
-        FindCoordinatorHandler request = new FindCoordinatorHandler(builder);
-        pendingRequests.add(request);
+        pendingRequests.add(new FindCoordinatorHandler(builder));
     }
 
     private void completeTransaction() {
@@ -473,9 +481,8 @@ public class TransactionManager {
             CommittedOffset committedOffset = new CommittedOffset(offsetAndMetadata.offset(), offsetAndMetadata.metadata());
             pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
         }
-        TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId,
-                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
-                pendingTxnOffsetCommits);
+        TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(transactionalId, consumerGroupId,
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, pendingTxnOffsetCommits);
         return new TxnOffsetCommitHandler(result, builder);
     }
 
@@ -491,19 +498,20 @@ public class TransactionManager {
             this(new TransactionalRequestResult());
         }
 
-        void fatal(RuntimeException e) {
+        void fatalError(RuntimeException e) {
+            result.setError(e);
+            transitionToFatalError(e);
+            result.done();
+        }
+
+        void abortableError(RuntimeException e) {
             result.setError(e);
-            transitionTo(State.ERROR, e);
+            transitionToAbortableError(e);
             result.done();
         }
 
-        void fenced() {
-            log.error("Producer has become invalid, which typically means another producer with the same " +
-                            "transactional.id has been started: producerId: {}. epoch: {}.",
-                    producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
-            result.setError(Errors.INVALID_PRODUCER_EPOCH.exception());
-            lastError = Errors.INVALID_PRODUCER_EPOCH.exception();
-            transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception());
+        void fail(RuntimeException e) {
+            result.setError(e);
             result.done();
         }
 
@@ -516,19 +524,19 @@ public class TransactionManager {
         @SuppressWarnings("unchecked")
         public void onComplete(ClientResponse response) {
             if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
-                fatal(new RuntimeException("Detected more than one in-flight transactional request."));
+                fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
             } else {
                 clearInFlightRequestCorrelationId();
                 if (response.wasDisconnected()) {
                     log.trace("disconnected from " + response.destination() + ". Will retry.");
                     reenqueue();
                 } else if (response.versionMismatch() != null) {
-                    fatal(response.versionMismatch());
+                    fatalError(response.versionMismatch());
                 } else if (response.hasResponse()) {
                     log.trace("Got transactional response for request:" + requestBuilder());
                     handleResponse(response.responseBody());
                 } else {
-                    fatal(new KafkaException("Could not execute transactional request for unknown reasons"));
+                    fatalError(new KafkaException("Could not execute transactional request for unknown reasons"));
                 }
             }
         }
@@ -585,6 +593,10 @@ public class TransactionManager {
         public void handleResponse(AbstractResponse response) {
             InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;
             Errors error = initProducerIdResponse.error();
+
+            log.debug("TransactionalId {} -- Received InitProducerId response with error {}",
+                    transactionalId, error);
+
             if (error == Errors.NONE) {
                 ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
                 setProducerIdAndEpoch(producerIdAndEpoch);
@@ -597,9 +609,9 @@ public class TransactionManager {
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
-                fatal(error.exception());
+                fatalError(error.exception());
             } else {
-                fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
+                fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
             }
         }
     }
@@ -626,6 +638,11 @@ public class TransactionManager {
             AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response;
             Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors();
             boolean hasPartitionErrors = false;
+            Set<String> unauthorizedTopics = new HashSet<>();
+
+            log.debug("TransactionalId {} -- Received AddPartitionsToTxn response with errors {}",
+                    transactionalId, errors);
+
             for (TopicPartition topicPartition : pendingPartitionsToBeAddedToTransaction) {
                 final Errors error = errors.get(topicPartition);
                 if (error == Errors.NONE || error == null) {
@@ -640,23 +657,28 @@ public class TransactionManager {
                     reenqueue();
                     return;
                 } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                    fenced();
+                    fatalError(error.exception());
                     return;
                 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
-                    fatal(error.exception());
+                    fatalError(error.exception());
                     return;
                 } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING
                         || error == Errors.INVALID_TXN_STATE) {
-                    fatal(new KafkaException(error.exception()));
+                    fatalError(new KafkaException(error.exception()));
                     return;
+                } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                    unauthorizedTopics.add(topicPartition.topic());
                 } else {
-                    log.error("Could not add partitions to transaction due to partition error. partition={}, error={}", topicPartition, error);
+                    log.error("TransactionalId: {} -- Could not add partition {} due to unexpected error {}",
+                            transactionalId, topicPartition, error);
                     hasPartitionErrors = true;
                 }
             }
 
-            if (hasPartitionErrors) {
-                fatal(new KafkaException("Could not add partitions to transaction due to partition level errors"));
+            if (!unauthorizedTopics.isEmpty()) {
+                abortableError(new TopicAuthorizationException(unauthorizedTopics));
+            } else if (hasPartitionErrors) {
+                abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors"));
             } else {
                 partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction);
                 pendingPartitionsToBeAddedToTransaction.clear();
@@ -695,7 +717,12 @@ public class TransactionManager {
         @Override
         public void handleResponse(AbstractResponse response) {
             FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
-            if (findCoordinatorResponse.error() == Errors.NONE) {
+            Errors error = findCoordinatorResponse.error();
+
+            log.debug("TransactionalId {} -- Received FindCoordinator response with error {}",
+                    transactionalId, error);
+
+            if (error == Errors.NONE) {
                 Node node = findCoordinatorResponse.node();
                 switch (builder.coordinatorType()) {
                     case GROUP:
@@ -705,12 +732,14 @@ public class TransactionManager {
                         transactionCoordinator = node;
                 }
                 result.done();
-            } else if (findCoordinatorResponse.error() == Errors.COORDINATOR_NOT_AVAILABLE) {
+            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
                 reenqueue();
+            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
+                fatalError(error.exception());
             } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
-                fatal(new GroupAuthorizationException("Not authorized to commit offsets " + builder.coordinatorKey()));
+                abortableError(new GroupAuthorizationException(builder.coordinatorKey()));
             } else {
-                fatal(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
+                fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
                         "unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(),
                         findCoordinatorResponse.error().message())));
             }
@@ -743,6 +772,10 @@ public class TransactionManager {
         public void handleResponse(AbstractResponse response) {
             EndTxnResponse endTxnResponse = (EndTxnResponse) response;
             Errors error = endTxnResponse.error();
+
+            log.debug("TransactionalId {} -- Received EndTxn response with error {}",
+                    transactionalId, error);
+
             if (error == Errors.NONE) {
                 completeTransaction();
                 result.done();
@@ -752,11 +785,13 @@ public class TransactionManager {
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
             } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                fenced();
+                fatalError(error.exception());
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
-                fatal(error.exception());
+                fatalError(error.exception());
+            } else if (error == Errors.INVALID_TXN_STATE) {
+                fatalError(error.exception());
             } else {
-                fatal(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
+                fatalError(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
             }
         }
     }
@@ -785,6 +820,10 @@ public class TransactionManager {
         public void handleResponse(AbstractResponse response) {
             AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
             Errors error = addOffsetsToTxnResponse.error();
+
+            log.debug("TransactionalId {} -- Received AddOffsetsToTxn response with error {}",
+                    transactionalId, error);
+
             if (error == Errors.NONE) {
                 // note the result is not completed until the TxnOffsetCommit returns
                 pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
@@ -794,11 +833,13 @@ public class TransactionManager {
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
             } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                fenced();
+                fatalError(error.exception());
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
-                fatal(error.exception());
+                fatalError(error.exception());
+            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
             } else {
-                fatal(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
+                fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
             }
         }
     }
@@ -837,7 +878,12 @@ public class TransactionManager {
             TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse) response;
             boolean coordinatorReloaded = false;
             boolean hadFailure = false;
-            for (Map.Entry<TopicPartition, Errors> entry : txnOffsetCommitResponse.errors().entrySet()) {
+            Map<TopicPartition, Errors> errors = txnOffsetCommitResponse.errors();
+
+            log.debug("TransactionalId {} -- Received TxnOffsetCommit response with errors {}",
+                    transactionalId, errors);
+
+            for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
                 TopicPartition topicPartition = entry.getKey();
                 Errors error = entry.getValue();
                 if (error == Errors.NONE) {
@@ -850,11 +896,17 @@ public class TransactionManager {
                     }
                 } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                     hadFailure = true;
+                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                    abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
+                    return;
+                } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
+                    fatalError(error.exception());
+                    return;
                 } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                    fenced();
+                    fatalError(error.exception());
                     return;
                 } else {
-                    fatal(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + error.message()));
+                    fatalError(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + error.message()));
                     return;
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
deleted file mode 100644
index 2da9158..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-public class ProducerIdAuthorizationException extends ApiException {
-    public ProducerIdAuthorizationException(final String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
index 9bf1fbb..3f85513 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.errors;
 
-public class TransactionalIdAuthorizationException extends ApiException {
+public class TransactionalIdAuthorizationException extends AuthorizationException {
     public TransactionalIdAuthorizationException(final String message) {
         super(message);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f94fb4d..9444eb5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -57,7 +57,6 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.ProducerFencedException;
-import org.apache.kafka.common.errors.ProducerIdAuthorizationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -483,21 +482,13 @@ public enum Errors {
             return new TransactionalIdAuthorizationException(message);
         }
     }),
-    PRODUCER_ID_AUTHORIZATION_FAILED(54, "Producer is not authorized to use producer Ids, " +
-            "which is required to write idempotent data.",
-                                             new ApiExceptionBuilder() {
-        @Override
-        public ApiException build(String message) {
-            return new ProducerIdAuthorizationException(message);
-        }
-    }),
-    SECURITY_DISABLED(55, "Security features are disabled.", new ApiExceptionBuilder() {
+    SECURITY_DISABLED(54, "Security features are disabled.", new ApiExceptionBuilder() {
         @Override
         public ApiException build(String message) {
             return new SecurityDisabledException(message);
         }
     }),
-    BROKER_AUTHORIZATION_FAILED(56, "Broker authorization failed", new ApiExceptionBuilder() {
+    BROKER_AUTHORIZATION_FAILED(55, "Broker authorization failed", new ApiExceptionBuilder() {
         @Override
         public ApiException build(String message) {
             return new BrokerAuthorizationException(message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index d5ce469..91391e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1516,6 +1516,9 @@ public class Protocol {
     );
 
     public static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema(
+            new Field("transactional_id",
+                    STRING,
+                    "The transactional id corresponding to the transaction."),
             new Field("consumer_group_id",
                     STRING,
                     "Id of the associated consumer group to commit offsets for."),

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 4bf8b3e..3339470 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -50,6 +50,17 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
         public AddOffsetsToTxnRequest build(short version) {
             return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId);
         }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(transactionalId=").append(transactionalId).
+                    append(", producerId=").append(producerId).
+                    append(", producerEpoch=").append(producerEpoch).
+                    append(", consumerGroupId=").append(consumerGroupId).
+                    append(")");
+            return bld.toString();
+        }
     }
 
     private final String transactionalId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 8b3a589..754f242 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -30,10 +30,11 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     //   NotCoordinator
     //   CoordinatorNotAvailable
     //   CoordinatorLoadInProgress
-    //   InvalidPidMapping
+    //   InvalidProducerIdMapping
+    //   InvalidProducerEpoch
     //   InvalidTxnState
     //   GroupAuthorizationFailed
-    //   InvalidProducerEpoch
+    //   TransactionalIdAuthorizationFailed
 
     private final Errors error;
     private final int throttleTimeMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 148ebec..e24fa5a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -54,6 +54,17 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
         public AddPartitionsToTxnRequest build(short version) {
             return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
         }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(transactionalId=").append(transactionalId).
+                    append(", producerId=").append(producerId).
+                    append(", producerEpoch=").append(producerEpoch).
+                    append(", partitions=").append(partitions).
+                    append(")");
+            return bld.toString();
+        }
     }
 
     private final String transactionalId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 697142b..39172ee 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -43,11 +43,12 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     //   CoordinatorNotAvailable
     //   CoordinatorLoadInProgress
     //   InvalidTxnState
-    //   InvalidPidMapping
+    //   InvalidProducerIdMapping
     //   TopicAuthorizationFailed
     //   InvalidProducerEpoch
     //   UnknownTopicOrPartition
     //   TopicAuthorizationFailed
+    //   TransactionalIdAuthorizationFailed
     private final Map<TopicPartition, Errors> errors;
 
     public AddPartitionsToTxnResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index 77ec137..b9f052c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -50,6 +50,17 @@ public class EndTxnRequest extends AbstractRequest {
         public EndTxnRequest build(short version) {
             return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result);
         }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(transactionalId=").append(transactionalId).
+                    append(", producerId=").append(producerId).
+                    append(", producerEpoch=").append(producerEpoch).
+                    append(", result=").append(result).
+                    append(")");
+            return bld.toString();
+        }
     }
 
     private final String transactionalId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 99e4e8c..17cf68d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -31,8 +31,9 @@ public class EndTxnResponse extends AbstractResponse {
     //   CoordinatorNotAvailable
     //   CoordinatorLoadInProgress
     //   InvalidTxnState
-    //   InvalidPidMapping
+    //   InvalidProducerIdMapping
     //   InvalidProducerEpoch
+    //   TransactionalIdAuthorizationFailed
 
     private final Errors error;
     private final int throttleTimeMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 7c8a6e5..96e1cdf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -24,11 +24,13 @@ import org.apache.kafka.common.record.RecordBatch;
 import java.nio.ByteBuffer;
 
 public class InitProducerIdResponse extends AbstractResponse {
-    /**
-     * Possible Error codes:
-     * OK
-     *
-     */
+    // Possible error codes:
+    //   NotCoordinator
+    //   CoordinatorNotAvailable
+    //   CoordinatorLoadInProgress
+    //   TransactionalIdAuthorizationFailed
+    //   ClusterAuthorizationFailed
+
     private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String EPOCH_KEY_NAME = "producer_epoch";

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 3377f91..3d696c1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -228,13 +228,14 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+    public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         /* In case the producer doesn't actually want any response */
         if (acks == 0)
             return null;
 
+        Errors error = Errors.forException(e);
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
-        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.forException(e));
+        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(error);
 
         for (TopicPartition tp : partitions())
             responseMap.put(tp, partitionResponse);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 42ae434..55332f6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -61,6 +61,9 @@ public class ProduceResponse extends AbstractResponse {
      * INVALID_REQUIRED_ACKS (21)
      * TOPIC_AUTHORIZATION_FAILED (29)
      * UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
+     * INVALID_PRODUCER_EPOCH (47)
+     * CLUSTER_AUTHORIZATION_FAILED (31)
+     * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
      */
 
     private static final String BASE_OFFSET_KEY_NAME = "base_offset";

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index f5334f2..68fa3d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class TxnOffsetCommitRequest extends AbstractRequest {
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
@@ -38,14 +39,16 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
     private static final String METADATA_KEY_NAME = "metadata";
 
     public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
+        private final String transactionalId;
         private final String consumerGroupId;
         private final long producerId;
         private final short producerEpoch;
         private final Map<TopicPartition, CommittedOffset> offsets;
 
-        public Builder(String consumerGroupId, long producerId, short producerEpoch,
+        public Builder(String transactionalId, String consumerGroupId, long producerId, short producerEpoch,
                        Map<TopicPartition, CommittedOffset> offsets) {
             super(ApiKeys.TXN_OFFSET_COMMIT);
+            this.transactionalId = transactionalId;
             this.consumerGroupId = consumerGroupId;
             this.producerId = producerId;
             this.producerEpoch = producerEpoch;
@@ -58,18 +61,32 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
 
         @Override
         public TxnOffsetCommitRequest build(short version) {
-            return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, offsets);
+            return new TxnOffsetCommitRequest(version, transactionalId, consumerGroupId, producerId, producerEpoch, offsets);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(transactionalId=").append(transactionalId).
+                    append(", producerId=").append(producerId).
+                    append(", producerEpoch=").append(producerEpoch).
+                    append(", consumerGroupId=").append(consumerGroupId).
+                    append(", offsets=").append(offsets).
+                    append(")");
+            return bld.toString();
         }
     }
 
+    private final String transactionalId;
     private final String consumerGroupId;
     private final long producerId;
     private final short producerEpoch;
     private final Map<TopicPartition, CommittedOffset> offsets;
 
-    public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch,
-                                  Map<TopicPartition, CommittedOffset> offsets) {
+    public TxnOffsetCommitRequest(short version, String transactionalId, String consumerGroupId, long producerId,
+                                  short producerEpoch, Map<TopicPartition, CommittedOffset> offsets) {
         super(version);
+        this.transactionalId = transactionalId;
         this.consumerGroupId = consumerGroupId;
         this.producerId = producerId;
         this.producerEpoch = producerEpoch;
@@ -78,6 +95,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
 
     public TxnOffsetCommitRequest(Struct struct, short version) {
         super(version);
+        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
         this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
         this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
         this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
@@ -98,6 +116,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         this.offsets = offsets;
     }
 
+    public String transactionalId() {
+        return transactionalId;
+    }
+
     public String consumerGroupId() {
         return consumerGroupId;
     }
@@ -117,6 +139,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
+        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
         struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
         struct.set(PRODUCER_ID_KEY_NAME, producerId);
         struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 37b9a50..a62568f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -42,6 +42,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
     //   OffsetMetadataTooLarge
     //   GroupAuthorizationFailed
     //   InvalidCommitOffsetSize
+    //   TransactionalIdAuthorizationFailed
 
     private final Map<TopicPartition, Errors> errors;
     private final int throttleTimeMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 06f6662..ddddc42 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -48,6 +48,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
     //   InvalidRequiredAcks
     //   TransactionCoordinatorFenced
     //   RequestTimeout
+    //   ClusterAuthorizationFailed
 
     private final Map<Long, Map<TopicPartition, Errors>> errors;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
index 06ace63..0e3441f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
@@ -47,7 +47,8 @@ public class AclOperationTest {
         new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false),
         new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false),
         new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false),
-        new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false)
+        new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false),
+        new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false)
     };
 
     @Test


Mime
View raw message