kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5129; Add ACL checks for Transactional APIs
Date Tue, 16 May 2017 17:14:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a281fe17f -> d66e7af65


KAFKA-5129; Add ACL checks for Transactional APIs

Add ACL checks for Transactional APIs

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #2979 from dguy/kafka-5129


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d66e7af6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d66e7af6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d66e7af6

Branch: refs/heads/trunk
Commit: d66e7af6526f208900a5d6cb588cf47058800804
Parents: a281fe1
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue May 16 09:57:15 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue May 16 09:57:15 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  25 +-
 .../producer/internals/ProducerBatch.java       |   4 +
 .../producer/internals/RecordAccumulator.java   |   8 +-
 .../clients/producer/internals/Sender.java      |  16 +-
 .../producer/internals/TransactionManager.java  |  72 +++--
 .../ProducerIdAuthorizationException.java       |  23 ++
 .../TransactionalIdAuthorizationException.java  |  23 ++
 .../apache/kafka/common/protocol/Errors.java    |  21 +-
 .../apache/kafka/common/protocol/Protocol.java  |  10 +-
 .../common/record/MemoryRecordsBuilder.java     |  17 +-
 .../requests/AddPartitionsToTxnRequest.java     |   7 +-
 .../requests/AddPartitionsToTxnResponse.java    |  56 +++-
 .../requests/FindCoordinatorResponse.java       |  10 +
 .../kafka/common/requests/ProduceRequest.java   |  12 +
 .../internals/TransactionManagerTest.java       |  51 +++-
 .../common/record/MemoryRecordsBuilderTest.java |  66 +++++
 .../common/requests/ProduceRequestTest.java     |  95 ++++++
 .../common/requests/RequestResponseTest.java    |   2 +-
 .../scala/kafka/security/auth/Resource.scala    |   2 +
 .../kafka/security/auth/ResourceType.scala      |  11 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 290 +++++++++++--------
 .../kafka/api/AuthorizerIntegrationTest.scala   | 113 +++++++-
 22 files changed, 752 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 05edf65..aeef92f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -694,19 +694,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         if (transactionManager.isFenced())
             throw new ProducerFencedException("The current producer has been fenced off by a another producer using the same transactional id.");
 
-        if (transactionManager.isInTransaction()) {
-            if (transactionManager.isInErrorState()) {
-                String errorMessage =
-                    "Cannot perform a transactional send because at least one previous transactional request has failed with errors.";
-                Exception lastError = transactionManager.lastError();
-                if (lastError != null)
-                    throw new KafkaException(errorMessage, lastError);
-                else
-                    throw new KafkaException(errorMessage);
-            }
-            if (transactionManager.isCompletingTransaction())
-                throw new IllegalStateException("Cannot call send while a commit or abort is in progress.");
+        if (transactionManager.isInErrorState()) {
+            String errorMessage =
+                    "Cannot perform send because at least one previous transactional or idempotent request has failed with errors.";
+            Exception lastError = transactionManager.lastError();
+            if (lastError != null)
+                throw new KafkaException(errorMessage, lastError);
+            else
+                throw new KafkaException(errorMessage);
         }
+        if (transactionManager.isCompletingTransaction())
+            throw new IllegalStateException("Cannot call send while a commit or abort is in progress.");
+
     }
 
     private void setReadOnly(Headers headers) {
@@ -1032,7 +1031,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 this.userCallback.onCompletion(metadata, exception);
 
             if (exception != null && transactionManager != null)
-                transactionManager.maybeSetError(exception);
+                transactionManager.setError(exception);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 3c5965a..1c078c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -247,6 +247,10 @@ public final class ProducerBatch {
         recordsBuilder.close();
     }
 
+    public void abort() {
+        recordsBuilder.abort();
+    }
+
     public boolean isClosed() {
         return recordsBuilder.isClosed();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d53c19d..5b8fb96 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -580,14 +580,18 @@ public final class RecordAccumulator {
      * Go through incomplete batches and abort them.
      */
     private void abortBatches() {
+        abortBatches(new IllegalStateException("Producer is closed forcefully."));
+    }
+
+    void abortBatches(final RuntimeException reason) {
         for (ProducerBatch batch : incomplete.all()) {
             Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
             // Close the batch before aborting
             synchronized (dq) {
-                batch.close();
+                batch.abort();
                 dq.remove(batch);
             }
-            batch.done(-1L, RecordBatch.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully."));
+            batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason);
             deallocate(batch);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index da09a1a..209a979 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -199,6 +199,13 @@ public class Sender implements Runnable {
         Cluster cluster = metadata.fetch();
         maybeWaitForProducerId();
 
+        if (transactionManager != null && transactionManager.isInErrorState()) {
+            final KafkaException exception = transactionManager.lastError() instanceof KafkaException
+                    ? (KafkaException) transactionManager.lastError()
+                    : new KafkaException(transactionManager.lastError());
+            this.accumulator.abortBatches(exception);
+            return Long.MAX_VALUE;
+        }
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
@@ -376,13 +383,19 @@ public class Sender implements Runnable {
         if (transactionManager == null || transactionManager.isTransactional())
             return;
 
-        while (!transactionManager.hasProducerId()) {
+        while (!transactionManager.hasProducerId() && !transactionManager.isInErrorState()) {
             try {
                 Node node = awaitLeastLoadedNodeReady(requestTimeout);
                 if (node != null) {
                     ClientResponse response = sendAndAwaitInitPidRequest(node);
+
                     if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) {
                         InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
+                        Exception exception = initProducerIdResponse.error().exception();
+                        if (exception != null && !(exception instanceof  RetriableException)) {
+                            transactionManager.setError(exception);
+                            return;
+                        }
                         ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
                                 initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
                         transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
@@ -401,6 +414,7 @@ public class Sender implements Runnable {
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index f3ed252..551a75a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -249,15 +249,11 @@ public class TransactionManager {
         return currentState == State.ERROR;
     }
 
-    public synchronized boolean maybeSetError(Exception exception) {
-        if (isTransactional() && isInTransaction()) {
-            if (exception instanceof ProducerFencedException)
-                transitionTo(State.FENCED, exception);
-            else
-                transitionTo(State.ERROR, exception);
-            return true;
-        }
-        return false;
+    public synchronized void setError(Exception exception) {
+        if (exception instanceof ProducerFencedException)
+            transitionTo(State.FENCED, exception);
+        else
+            transitionTo(State.ERROR, exception);
     }
 
     /**
@@ -579,6 +575,8 @@ public class TransactionManager {
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
+            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
+                fatal(error.exception());
             } else {
                 fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
             }
@@ -605,27 +603,43 @@ public class TransactionManager {
         @Override
         public void handleResponse(AbstractResponse response) {
             AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response;
-            Errors error = addPartitionsToTxnResponse.error();
-            if (error == Errors.NONE) {
+            Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors();
+            boolean hasPartitionErrors = false;
+            for (TopicPartition topicPartition : pendingPartitionsToBeAddedToTransaction) {
+                final Errors error = errors.get(topicPartition);
+                if (error == Errors.NONE || error == null) {
+                    continue;
+                }
+
+                if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
+                    lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+                    reenqueue();
+                    return;
+                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+                    reenqueue();
+                    return;
+                } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
+                    fenced();
+                    return;
+                } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
+                    fatal(error.exception());
+                    return;
+                } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING
+                        || error == Errors.INVALID_TXN_STATE) {
+                    fatal(new KafkaException(error.exception()));
+                    return;
+                } else {
+                    log.error("Could not add partitions to transaction due to partition error. partition={}, error={}", topicPartition, error);
+                    hasPartitionErrors = true;
+                }
+            }
+
+            if (hasPartitionErrors) {
+                fatal(new KafkaException("Could not add partitions to transaction due to partition level errors"));
+            } else {
                 partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction);
                 pendingPartitionsToBeAddedToTransaction.clear();
                 result.done();
-            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
-                lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
-                reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
-                reenqueue();
-            } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING) {
-                fatal(new KafkaException(error.exception()));
-            } else if (error == Errors.INVALID_TXN_STATE) {
-                fatal(new KafkaException(error.exception()));
-            } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                fenced();
-            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                fatal(error.exception());
-            } else {
-                fatal(new KafkaException("Could not add partitions to transaction due to unknown error: " +
-                        error.message()));
             }
         }
     }
@@ -718,6 +732,8 @@ public class TransactionManager {
                 reenqueue();
             } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
                 fenced();
+            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
+                fatal(error.exception());
             } else {
                 fatal(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
             }
@@ -758,6 +774,8 @@ public class TransactionManager {
                 reenqueue();
             } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
                 fenced();
+            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
+                fatal(error.exception());
             } else {
                 fatal(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
new file mode 100644
index 0000000..2da9158
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class ProducerIdAuthorizationException extends ApiException {
+    public ProducerIdAuthorizationException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
new file mode 100644
index 0000000..9bf1fbb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class TransactionalIdAuthorizationException extends ApiException {
+    public TransactionalIdAuthorizationException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a0922cf..7780fbe 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -56,6 +56,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.ProducerIdAuthorizationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -64,6 +65,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
@@ -470,7 +472,24 @@ public enum Errors {
             public ApiException build(String message) {
                 return new TransactionCoordinatorFencedException(message);
             }
-        });
+        }),
+    TRANSACTIONAL_ID_AUTHORIZATION_FAILED(53, "Transactional Id authorization failed",
+                                                  new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new TransactionalIdAuthorizationException(message);
+        }
+    }),
+    PRODUCER_ID_AUTHORIZATION_FAILED(54, "Producer is not authorized to use producer Ids, " +
+            "which is required to write idempotent data.",
+                                             new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new ProducerIdAuthorizationException(message);
+        }
+    });
+
+
              
     private interface ApiExceptionBuilder {
         ApiException build(String message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 5e05738..08aef4b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1399,9 +1399,13 @@ public class Protocol {
     );
     public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
             newThrottleTimeField(),
-            new Field("error_code",
-                    INT16,
-                    "An integer error code.")
+            new Field("errors",
+                      new ArrayOf(new Schema(new Field("topic", STRING),
+                                   new Field("partition_errors",
+                                             new ArrayOf(new Schema(new Field("partition",
+                                                                              INT32),
+                                                                    new Field("error_code",
+                                                                              INT16)))))))
     );
 
     public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = new Schema[] {ADD_PARTITIONS_TO_TXN_REQUEST_V0};

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index e52df76..6f90fac 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -84,6 +84,7 @@ public class MemoryRecordsBuilder {
     private Long baseTimestamp = null;
 
     private MemoryRecords builtRecords;
+    private boolean aborted = false;
 
     /**
      * Construct a new builder.
@@ -175,6 +176,9 @@ public class MemoryRecordsBuilder {
      * @return The built log buffer
      */
     public MemoryRecords build() {
+        if (aborted) {
+            throw new KafkaException("Attempting to build an aborted record batch");
+        }
         close();
         return builtRecords;
     }
@@ -246,7 +250,16 @@ public class MemoryRecordsBuilder {
         }
     }
 
+    public void abort() {
+        closeForRecordAppends();
+        buffer().position(initPos);
+        aborted = true;
+    }
+
     public void close() {
+        if (aborted)
+            throw new IllegalStateException("Cannot close MemoryRecordsBuilder as it has already been aborted");
+
         if (builtRecords != null)
             return;
 
@@ -605,13 +618,13 @@ public class MemoryRecordsBuilder {
     private void ensureOpenForRecordAppend() {
         if (appendStreamIsClosed)
             throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends");
-        if (isClosed())
-            throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed");
     }
 
     private void ensureOpenForRecordBatchWrite() {
         if (isClosed())
             throw new IllegalStateException("Tried to write record batch header, but MemoryRecordsBuilder is closed");
+        if (aborted)
+            throw new IllegalStateException("Tried to write record batch header, but MemoryRecordsBuilder is aborted");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 69ae25c..148ebec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -126,7 +127,11 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
 
     @Override
     public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new AddPartitionsToTxnResponse(throttleTimeMs, Errors.forException(e));
+        final HashMap<TopicPartition, Errors> errors = new HashMap<>();
+        for (TopicPartition partition : partitions) {
+            errors.put(partition, Errors.forException(e));
+        }
+        return new AddPartitionsToTxnResponse(throttleTimeMs, errors);
     }
 
     public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 893fcda..697142b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -16,15 +16,27 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class AddPartitionsToTxnResponse extends AbstractResponse {
     private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String ERRORS_KEY_NAME = "errors";
+    private static final String TOPIC_NAME = "topic";
+    private static final String PARTITION = "partition";
+    private static final String PARTITION_ERRORS = "partition_errors";
+
+    private final int throttleTimeMs;
 
     // Possible error codes:
     //   NotCoordinator
@@ -34,33 +46,59 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     //   InvalidPidMapping
     //   TopicAuthorizationFailed
     //   InvalidProducerEpoch
+    //   UnknownTopicOrPartition
+    //   TopicAuthorizationFailed
+    private final Map<TopicPartition, Errors> errors;
 
-    private final Errors error;
-    private final int throttleTimeMs;
-
-    public AddPartitionsToTxnResponse(int throttleTimeMs, Errors error) {
+    public AddPartitionsToTxnResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
         this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
+        this.errors = errors;
     }
 
     public AddPartitionsToTxnResponse(Struct struct) {
         this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        errors = new HashMap<>();
+        for (Object topic : struct.getArray(ERRORS_KEY_NAME)) {
+            Struct topicStruct = (Struct) topic;
+            final String topicName = topicStruct.getString(TOPIC_NAME);
+            for (Object partition : topicStruct.getArray(PARTITION_ERRORS)) {
+                Struct partitionStruct = (Struct) partition;
+                TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.getInt(PARTITION));
+                errors.put(topicPartition, Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME)));
+            }
+        }
     }
 
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
 
-    public Errors error() {
-        return error;
+    public Map<TopicPartition, Errors> errors() {
+        return errors;
     }
 
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
         struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+
+        Map<String, Map<Integer, Errors>> errorsByTopic = CollectionUtils.groupDataByTopic(errors);
+        List<Struct> topics = new ArrayList<>(errorsByTopic.size());
+        for (Map.Entry<String, Map<Integer, Errors>> entry : errorsByTopic.entrySet()) {
+            Struct topicErrorCodes = struct.instance(ERRORS_KEY_NAME);
+            topicErrorCodes.set(TOPIC_NAME, entry.getKey());
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, Errors> partitionErrors : entry.getValue().entrySet()) {
+                final Struct partitionData = topicErrorCodes.instance(PARTITION_ERRORS)
+                        .set(PARTITION, partitionErrors.getKey())
+                        .set(ERROR_CODE_KEY_NAME, partitionErrors.getValue().code());
+                partitionArray.add(partitionData);
+
+            }
+            topicErrorCodes.set(PARTITION_ERRORS, partitionArray.toArray());
+            topics.add(topicErrorCodes);
+        }
+        struct.set(ERRORS_KEY_NAME, topics.toArray());
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index b558b62..e7df8e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -106,4 +106,14 @@ public class FindCoordinatorResponse extends AbstractResponse {
     public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) {
         return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer));
     }
+
+    @Override
+    public String toString() {
+        return "FindCoordinatorResponse{" +
+                "throttleTimeMs=" + throttleTimeMs +
+                ", errorMessage='" + errorMessage + '\'' +
+                ", error=" + error +
+                ", node=" + node +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index b63f6c2..3377f91 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -109,6 +109,8 @@ public class ProduceRequest extends AbstractRequest {
     // put in the purgatory (due to client throttling, it can take a while before the response is sent).
     // Care should be taken in methods that use this field.
     private volatile Map<TopicPartition, MemoryRecords> partitionRecords;
+    private boolean transactional = false;
+    private boolean idempotent = false;
 
     private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords, String transactionalId) {
         super(version);
@@ -165,6 +167,8 @@ public class ProduceRequest extends AbstractRequest {
             if (iterator.hasNext())
                 throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
                         "contain exactly one record batch");
+            idempotent = entry.hasProducerId();
+            transactional = entry.isTransactional();
         }
 
         // Note that we do not do similar validation for older versions to ensure compatibility with
@@ -264,6 +268,14 @@ public class ProduceRequest extends AbstractRequest {
         return transactionalId;
     }
 
+    public boolean isTransactional() {
+        return transactional;
+    }
+
+    public boolean isIdempotent() {
+        return idempotent;
+    }
+
     /**
      * Returns the partition records or throws IllegalStateException if clearPartitionRecords() has been invoked.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index c0acfec..4db0452 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -499,6 +499,53 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isReadyForTransaction());  // make sure we are ready for a transaction now.
     }
 
+    @Test
+    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
+        verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
+    }
+
+    @Test
+    public void shouldNotAddPartitionsToTransactionWhenUnknownTopicOrPartition() throws Exception {
+        verifyAddPartitionsFailsWithPartitionLevelError(Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    }
+
+    private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
+        client.setNode(brokerNode);
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+
+        final long pid = 1L;
+        final short epoch = 1;
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+
+        sender.run(time.milliseconds());  // get pid.
+
+        assertTrue(transactionManager.hasProducerId());
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                                                                   "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnPartitionErrorResponse(tp0, error);
+        sender.run(time.milliseconds());  // attempt send addPartitions.
+        assertTrue(transactionManager.isInErrorState());
+        assertFalse(transactionManager.transactionContainsPartition(tp0));
+    }
+
+    private void prepareAddPartitionsToTxnPartitionErrorResponse(final TopicPartition tp0, final Errors error) {
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                assertTrue(body instanceof AddPartitionsToTxnRequest);
+                return true;
+            }
+        }, new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, error)));
+    }
+
     private static class MockCallback implements Callback {
         private final TransactionManager transactionManager;
         public MockCallback(TransactionManager transactionManager) {
@@ -507,7 +554,7 @@ public class TransactionManagerTest {
         @Override
         public void onCompletion(RecordMetadata metadata, Exception exception) {
             if (exception != null && transactionManager != null) {
-                transactionManager.maybeSetError(exception);
+                transactionManager.setError(exception);
             }
         }
     }
@@ -570,7 +617,7 @@ public class TransactionManagerTest {
                 assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
                 return true;
             }
-        }, new AddPartitionsToTxnResponse(0, error));
+        }, new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error)));
     }
 
     private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index a300a65..c08a2f0 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -31,6 +32,7 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(value = Parameterized.class)
 public class MemoryRecordsBuilderTest {
@@ -471,6 +473,70 @@ public class MemoryRecordsBuilderTest {
         assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
     }
 
+    @Test
+    public void shouldThrowKafkaExceptionOnBuildWhenAborted() throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+                                                                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                                                                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+        builder.abort();
+        try {
+            builder.build();
+            fail("Should have thrown KafkaException");
+        } catch (KafkaException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void shouldResetBufferToInitialPositionOnAbort() throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+                                                                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                                                                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+        builder.append(0L, "a".getBytes(), "1".getBytes());
+        builder.abort();
+        assertEquals(bufferOffset, builder.buffer().position());
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+                                                                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                                                                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+        builder.abort();
+        try {
+            builder.close();
+            fail("Should have thrown IllegalStateException");
+        } catch (IllegalStateException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+                                                                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                                                                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+        builder.abort();
+        try {
+            builder.append(0L, "a".getBytes(), "1".getBytes());
+            fail("Should have thrown IllegalStateException");
+        } catch (IllegalStateException e) {
+            // ok
+        }
+    }
+
     @Parameterized.Parameters
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
new file mode 100644
index 0000000..0e8f382
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ProduceRequestTest {
+
+    private final SimpleRecord simpleRecord = new SimpleRecord(System.currentTimeMillis(),
+                                                               "key".getBytes(),
+                                                               "value".getBytes());
+
+    @Test
+    public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() throws Exception {
+        final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0,
+                                                                                   CompressionType.NONE,
+                                                                                   1L,
+                                                                                   (short) 1,
+                                                                                   1,
+                                                                                   1,
+                                                                                   simpleRecord);
+        final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
+                                                                  (short) -1,
+                                                                  10,
+                                                                  Collections.singletonMap(
+                                                                          new TopicPartition("topic", 1), memoryRecords)).build();
+        assertTrue(request.isTransactional());
+    }
+
+    @Test
+    public void shouldNotBeFlaggedAsTransactionalWhenNoRecords() throws Exception {
+        final ProduceRequest request = createNonIdempotentNonTransactionalRecords();
+        assertFalse(request.isTransactional());
+    }
+
+    @Test
+    public void shouldNotBeFlaggedAsIdempotentWhenRecordsNotIdempotent() throws Exception {
+        final ProduceRequest request = createNonIdempotentNonTransactionalRecords();
+        assertFalse(request.isTransactional());
+    }
+
+    @Test
+    public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception {
+        final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1,
+                                                                                CompressionType.NONE,
+                                                                                1L,
+                                                                                (short) 1,
+                                                                                1,
+                                                                                1,
+                                                                                simpleRecord);
+
+        final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
+                                                                  (short) -1,
+                                                                  10,
+                                                                  Collections.singletonMap(
+                                                                          new TopicPartition("topic", 1), memoryRecords)).build();
+        assertTrue(request.isIdempotent());
+
+    }
+
+    private ProduceRequest createNonIdempotentNonTransactionalRecords() {
+        final MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.NONE,
+                                                                      simpleRecord);
+        return new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
+                                          (short) -1,
+                                          10,
+                                          Collections.singletonMap(
+                                                  new TopicPartition("topic", 1), memoryRecords)).build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1cfd6a3..4946246 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -914,7 +914,7 @@ public class RequestResponseTest {
     }
 
     private AddPartitionsToTxnResponse createAddPartitionsToTxnResponse() {
-        return new AddPartitionsToTxnResponse(0, Errors.NONE);
+        return new AddPartitionsToTxnResponse(0, Collections.singletonMap(new TopicPartition("t", 0), Errors.NONE));
     }
 
     private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/core/src/main/scala/kafka/security/auth/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index 17d09ce..a0ed9f9 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -20,6 +20,8 @@ object Resource {
   val Separator = ":"
   val ClusterResourceName = "kafka-cluster"
   val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName)
+  val ProducerIdResourceName = "producer-id"
+  val ProducerIdResource = new Resource(Cluster, Resource.ProducerIdResourceName)
   val WildCardResource = "*"
 
   def fromString(str: String): Resource = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index 9630c82..e58d8ec 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -41,6 +41,15 @@ case object Group extends ResourceType {
   val error = Errors.GROUP_AUTHORIZATION_FAILED
 }
 
+case object ProducerTransactionalId extends ResourceType {
+  val name = "ProducerTransactionalId"
+  val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
+}
+
+case object ProducerIdResource extends ResourceType {
+  val name = "ProducerIdResource"
+  val error = Errors.PRODUCER_ID_AUTHORIZATION_FAILED
+}
 
 object ResourceType {
 
@@ -49,5 +58,5 @@ object ResourceType {
     rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
   }
 
-  def values: Seq[ResourceType] = List(Cluster, Topic, Group)
+  def values: Seq[ResourceType] = List(Cluster, Topic, Group, ProducerTransactionalId, ProducerIdResource)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c746365..5e9cd9f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -35,9 +35,9 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
 import kafka.network.RequestChannel.{Response, Session}
-import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Topic, Write}
+import kafka.security.auth._
 import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{isInternal, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
 import org.apache.kafka.common.metrics.Metrics
@@ -364,88 +364,95 @@ class KafkaApis(val requestChannel: RequestChannel,
     val produceRequest = request.body[ProduceRequest]
     val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size
 
-    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
-      produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
-        authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
-      }
+    if (produceRequest.isTransactional && !authorize(request.session, Write, new Resource(ProducerTransactionalId, produceRequest.transactionalId())))
+      sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
+    else if (produceRequest.isIdempotent && !authorize(request.session, Write, Resource.ProducerIdResource))
+      sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception()))
+    else {
+      val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
+        produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
+          authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
+        }
 
-    val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
-      case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic))
-    }
+      val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
+        case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic))
+      }
 
-    // the callback for sending a produce response
-    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+      // the callback for sending a produce response
+      def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
 
-      val mergedResponseStatus = responseStatus ++
-        unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
-        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+        val mergedResponseStatus = responseStatus ++
+            unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
+            nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
-      var errorInResponse = false
+        var errorInResponse = false
 
-      mergedResponseStatus.foreach { case (topicPartition, status) =>
-        if (status.error != Errors.NONE) {
-          errorInResponse = true
-          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
-            request.header.correlationId,
-            request.header.clientId,
-            topicPartition,
-            status.error.exceptionName))
+        mergedResponseStatus.foreach { case (topicPartition, status) =>
+          if (status.error != Errors.NONE) {
+            errorInResponse = true
+            debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
+              request.header.correlationId,
+              request.header.clientId,
+              topicPartition,
+              status.error.exceptionName))
+          }
         }
-      }
 
-      def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
-        if (produceRequest.acks == 0) {
-          // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
-          // the request, since no response is expected by the producer, the server will close socket server so that
-          // the producer client will know that some error has happened and will refresh its metadata
-          if (errorInResponse) {
-            val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
-              topicPartition -> status.error.exceptionName
-            }.mkString(", ")
-            info(
-              s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
-                s"from client id ${request.header.clientId} with ack=0\n" +
-                s"Topic and partition to exceptions: $exceptionsSummary"
-            )
-            requestChannel.closeConnection(request.processor, request)
+        def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
+          if (produceRequest.acks == 0) {
+            // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
+            // the request, since no response is expected by the producer, the server will close socket server so that
+            // the producer client will know that some error has happened and will refresh its metadata
+            if (errorInResponse) {
+              val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
+                topicPartition -> status.error.exceptionName
+              }.mkString(", ")
+              info(
+                s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
+                  s"from client id ${request.header.clientId} with ack=0\n" +
+                  s"Topic and partition to exceptions: $exceptionsSummary"
+              )
+              requestChannel.closeConnection(request.processor, request)
+            } else {
+              requestChannel.noOperation(request.processor, request)
+            }
           } else {
-            requestChannel.noOperation(request.processor, request)
-          }
-        } else {
-          def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = {
-            new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs)
+            def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = {
+              new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs)
+            }
+
+            sendResponseMaybeThrottle(request, createResponseCallback)
           }
-          sendResponseMaybeThrottle(request, createResponseCallback)
         }
-      }
 
-      // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeNanos = time.nanoseconds
-
-      quotas.produce.recordAndMaybeThrottle(
-        request.session.sanitizedUser,
-        request.header.clientId,
-        numBytesAppended,
-        produceResponseCallback)
-    }
+        // When this callback is triggered, the remote API call has completed
+        request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
-    if (authorizedRequestInfo.isEmpty)
-      sendResponseCallback(Map.empty)
-    else {
-      val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
+        quotas.produce.recordAndMaybeThrottle(
+          request.session.sanitizedUser,
+          request.header.clientId,
+          numBytesAppended,
+          produceResponseCallback)
+      }
 
-      // call the replica manager to append messages to the replicas
-      replicaManager.appendRecords(
-        produceRequest.timeout.toLong,
-        produceRequest.acks,
-        internalTopicsAllowed,
-        isFromClient = true,
-        authorizedRequestInfo,
-        sendResponseCallback)
+      if (authorizedRequestInfo.isEmpty)
+        sendResponseCallback(Map.empty)
+      else {
+        val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
+
+        // call the replica manager to append messages to the replicas
+        replicaManager.appendRecords(
+          produceRequest.timeout.toLong,
+          produceRequest.acks,
+          internalTopicsAllowed,
+          isFromClient = true,
+          authorizedRequestInfo,
+          sendResponseCallback)
 
-      // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
-      // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
-      produceRequest.clearPartitionRecords()
+        // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
+        // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
+        produceRequest.clearPartitionRecords()
+      }
     }
   }
 
@@ -1391,35 +1398,45 @@ class KafkaApis(val requestChannel: RequestChannel,
     val initProducerIdRequest = request.body[InitProducerIdRequest]
     val transactionalId = initProducerIdRequest.transactionalId
 
-    // Send response callback
-    def sendResponseCallback(result: InitProducerIdResult): Unit = {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch)
-        trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
-        responseBody
+
+    if (!authorize(request.session, Write, Resource.ProducerIdResource)) {
+      sendResponseMaybeThrottle(request, (throttleTime: Int) => new InitProducerIdResponse(throttleTime, Errors.PRODUCER_ID_AUTHORIZATION_FAILED))
+    } else if (transactionalId == null || authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) {
+      // Send response callback
+      def sendResponseCallback(result: InitProducerIdResult): Unit = {
+        def createResponse(throttleTimeMs: Int): AbstractResponse = {
+          val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch)
+          trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
+          responseBody
+        }
+        sendResponseMaybeThrottle(request, createResponse)
       }
-      sendResponseMaybeThrottle(request, createResponse)
-    }
-    txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
+      txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
+    }else
+      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new InitProducerIdResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
   }
 
   def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
     val endTxnRequest = request.body[EndTxnRequest]
-
-    def sendResponseCallback(error: Errors) {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody = new EndTxnResponse(throttleTimeMs, error)
-        trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.")
-        responseBody
+    val transactionalId = endTxnRequest.transactionalId
+
+    if(authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) {
+      def sendResponseCallback(error: Errors) {
+        def createResponse(throttleTimeMs: Int): AbstractResponse = {
+          val responseBody = new EndTxnResponse(throttleTimeMs, error)
+          trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.")
+          responseBody
+        }
+        sendResponseMaybeThrottle(request, createResponse)
       }
-      sendResponseMaybeThrottle(request, createResponse)
-    }
 
-    txnCoordinator.handleEndTransaction(endTxnRequest.transactionalId(),
-      endTxnRequest.producerId(),
-      endTxnRequest.producerEpoch(),
-      endTxnRequest.command(),
-      sendResponseCallback)
+      txnCoordinator.handleEndTransaction(endTxnRequest.transactionalId,
+        endTxnRequest.producerId,
+        endTxnRequest.producerEpoch,
+        endTxnRequest.command,
+        sendResponseCallback)
+    } else
+      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new EndTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
   }
 
   def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
@@ -1484,21 +1501,53 @@ class KafkaApis(val requestChannel: RequestChannel,
     val transactionalId = addPartitionsToTxnRequest.transactionalId
     val partitionsToAdd = addPartitionsToTxnRequest.partitions
 
-    // Send response callback
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs, error)
-        trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
-        responseBody
+    if(!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId)))
+      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => addPartitionsToTxnRequest.getErrorResponse(1, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
+    else {
+      val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
+
+      val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
+        partitionsToAdd.asScala.partition { tp =>
+          authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
+        }
+
+      val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
+        tp => authorize(request.session, Write, new Resource(Topic, tp.topic))
+      }
+
+      if (nonExistingOrUnauthorizedForDescribeTopics.nonEmpty
+        || unauthorizedForWriteRequestInfo.nonEmpty
+        || internalTopics.nonEmpty) {
+
+        // Only send back error responses for the partitions that failed. If there are any partition failures
+        // then the entire request fails
+        val partitionErrors = unauthorizedForWriteRequestInfo.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) }.toMap ++
+          nonExistingOrUnauthorizedForDescribeTopics.map { tp => (tp, Errors.UNKNOWN_TOPIC_OR_PARTITION) }.toMap ++
+          internalTopics.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) }
+
+        sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddPartitionsToTxnResponse(throttleTimeMs, partitionErrors.asJava))
+      } else {
+        // Send response callback
+        def sendResponseCallback(error: Errors): Unit = {
+          def createResponse(throttleTimeMs: Int): AbstractResponse = {
+            val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs,
+              partitionsToAdd.asScala.map{tp => (tp, error)}.toMap.asJava)
+            trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
+            responseBody
+          }
+
+          sendResponseMaybeThrottle(request, createResponse)
+        }
+
+        txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
+          addPartitionsToTxnRequest.producerId(),
+          addPartitionsToTxnRequest.producerEpoch(),
+          partitionsToAdd.asScala.toSet,
+          sendResponseCallback)
       }
-      sendResponseMaybeThrottle(request, createResponse)
     }
 
-    txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
-      addPartitionsToTxnRequest.producerId(),
-      addPartitionsToTxnRequest.producerEpoch(),
-      partitionsToAdd.asScala.toSet,
-      sendResponseCallback)
+
   }
 
   def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = {
@@ -1507,22 +1556,29 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupId = addOffsetsToTxnRequest.consumerGroupId
     val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
-    // Send response callback
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error)
-        trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
-        responseBody
+    if (!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId)))
+      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+    else if (!authorize(request.session, Read, new Resource(Group, groupId)))
+      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED))
+    else {
+        // Send response callback
+        def sendResponseCallback(error: Errors): Unit = {
+          def createResponse(throttleTimeMs: Int): AbstractResponse = {
+            val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error)
+            trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
+            responseBody
+          }
+          sendResponseMaybeThrottle(request, createResponse)
+        }
+
+        txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
+          addOffsetsToTxnRequest.producerId,
+          addOffsetsToTxnRequest.producerEpoch,
+          Set(offsetTopicPartition),
+          sendResponseCallback)
       }
-      sendResponseMaybeThrottle(request, createResponse)
     }
 
-    txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
-      addOffsetsToTxnRequest.producerId(),
-      addOffsetsToTxnRequest.producerEpoch(),
-      Set[TopicPartition](offsetTopicPartition),
-      sendResponseCallback)
-  }
 
   def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
     val header = request.header

http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 52a90d8..9eb1275 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -24,7 +24,7 @@ import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@@ -42,7 +42,7 @@ import org.apache.kafka.common.KafkaException
 import kafka.admin.AdminUtils
 import kafka.network.SocketServer
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.{CompressionType, SimpleRecord, RecordBatch, MemoryRecords}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
 
@@ -53,6 +53,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val topicPattern = "topic.*"
   val createTopic = "topic-new"
   val deleteTopic = "topic-delete"
+  val transactionalId = "transactional.id"
   val part = 0
   val correlationId = 0
   val clientId = "client-Id"
@@ -62,6 +63,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val topicResource = new Resource(Topic, topic)
   val groupResource = new Resource(Group, group)
   val deleteTopicResource = new Resource(Topic, deleteTopic)
+  val producerTransactionalIdResource = new Resource(ProducerTransactionalId, transactionalId)
 
   val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
@@ -70,9 +72,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
   val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
   val TopicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
+  val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
 
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
   val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  var transactionalProducer: KafkaProducer[Array[Byte], Array[Byte]] = _
 
   val producerCount = 1
   val consumerCount = 2
@@ -83,6 +87,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
     properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
+    properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
   }
 
   val RequestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
@@ -158,6 +165,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
         maxBlockMs = 3000,
         acks = 1)
+
+    val transactionalProperties = new Properties()
+    transactionalProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
+    transactionalProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 3,
+      props = Some(transactionalProperties)
+    )
+
     for (_ <- 0 until consumerCount)
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
 
@@ -810,6 +826,99 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertEquals(Errors.NONE, deleteResponse.errors.asScala.head._2)
   }
 
+  @Test(expected = classOf[TransactionalIdAuthorizationException])
+  def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnInitTransactions(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
+    transactionalProducer.initTransactions()
+  }
+
+  @Test
+  def shouldInitTransactionsWhenAclSet(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
+    transactionalProducer.initTransactions()
+  }
+
+
+  @Test
+  def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
+    transactionalProducer.initTransactions()
+    removeAllAcls()
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    try {
+      transactionalProducer.beginTransaction()
+      transactionalProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get
+      Assert.fail("expected TransactionalIdAuthorizationException")
+    } catch {
+      case e: ExecutionException => assertTrue(s"expected TransactionalIdAuthorizationException, but got ${e.getCause}", e.getCause.isInstanceOf[TransactionalIdAuthorizationException])
+    }
+  }
+
+  @Test
+  def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
+    transactionalProducer.initTransactions()
+    transactionalProducer.beginTransaction()
+    removeAllAcls()
+    try {
+      transactionalProducer.commitTransaction()
+      Assert.fail("expected TransactionalIdAuthorizationException")
+    } catch {
+      case _: TransactionalIdAuthorizationException => // ok
+    }
+  }
+
+  @Test
+  def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), groupResource)
+    transactionalProducer.initTransactions()
+    transactionalProducer.beginTransaction()
+    removeAllAcls()
+    try {
+      val offsets: util.Map[TopicPartition, OffsetAndMetadata] = Map(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition) -> new OffsetAndMetadata(1L)).asJava
+      transactionalProducer.sendOffsetsToTransaction(offsets, group)
+      Assert.fail("expected TransactionalIdAuthorizationException")
+    } catch {
+      case _: TransactionalIdAuthorizationException => // ok
+    }
+  }
+
+
+  @Test
+  def shouldThrowProducerIdAuthorizationExceptionWhenAclNotSet(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    val idempotentProperties = new Properties()
+    idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    val idempotentProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 3,
+      props = Some(idempotentProperties)
+    )
+    try {
+      idempotentProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get
+      Assert.fail("expected ProducerIdAuthorizationException")
+    } catch {
+      case e: ExecutionException => assertTrue(s"expected ProducerIdAuthorizationException, but got ${e.getCause}", e.getCause.isInstanceOf[ProducerIdAuthorizationException])
+    }
+  }
+
+  @Test
+  def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    val idempotentProperties = new Properties()
+    idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    val idempotentProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 3,
+      props = Some(idempotentProperties)
+    )
+    idempotentProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get
+  }
+
   def removeAllAcls() = {
     servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
       servers.head.apis.authorizer.get.removeAcls(resource)


Mime
View raw message