kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [4/5] kafka git commit: KAFKA-5259; TransactionalId auth implies ProducerId auth
Date Wed, 24 May 2017 22:29:02 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index cc30f4d..719efe9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -231,20 +232,8 @@ public class SenderTest {
         int maxRetries = 1;
         Metrics m = new Metrics();
         try {
-            Sender sender = new Sender(client,
-                    metadata,
-                    this.accumulator,
-                    false,
-                    MAX_REQUEST_SIZE,
-                    ACKS_ALL,
-                    maxRetries,
-                    m,
-                    time,
-                    REQUEST_TIMEOUT,
-                    50,
-                    null,
-                    apiVersions
-            );
+            Sender sender = new Sender(client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                    maxRetries, m, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
@@ -290,20 +279,8 @@ public class SenderTest {
         int maxRetries = 1;
         Metrics m = new Metrics();
         try {
-            Sender sender = new Sender(client,
-                    metadata,
-                    this.accumulator,
-                    true,
-                    MAX_REQUEST_SIZE,
-                    ACKS_ALL,
-                    maxRetries,
-                    m,
-                    time,
-                    REQUEST_TIMEOUT,
-                    50,
-                    null,
-                    apiVersions
-            );
+            Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                    m, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@@ -375,21 +352,63 @@ public class SenderTest {
     }
 
     @Test
-    public void testInitPidRequest() throws Exception {
+    public void testInitProducerIdRequest() throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+        assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
+        assertEquals((short) 0, transactionManager.producerIdAndEpoch().epoch);
+    }
+
+    @Test
+    public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        client.setNode(new Node(1, "localhost", 33343));
+        prepareAndReceiveInitProducerId(producerId, Errors.CLUSTER_AUTHORIZATION_FAILED);
+        assertFalse(transactionManager.hasProducerId());
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.lastError() instanceof ClusterAuthorizationException);
+
+        // cluster authorization is a fatal error for the producer
+        assertSendFailure(ClusterAuthorizationException.class);
+    }
+
+    @Test
+    public void testClusterAuthorizationExceptionInProduceRequest() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+
+        client.setNode(new Node(1, "localhost", 33343));
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        // cluster authorization is a fatal error for the producer
+        Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
+                null, null, MAX_BLOCK_TIMEOUT).future;
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
-                return body instanceof InitProducerIdRequest;
+                return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent();
             }
-        }, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0));
+        }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
+
         sender.run(time.milliseconds());
-        assertTrue(transactionManager.hasProducerId());
-        assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
-        assertEquals((short) 0, transactionManager.producerIdAndEpoch().epoch);
+        assertTrue(future.isDone());
+        try {
+            future.get();
+            fail("Future should have raised ClusterAuthorizationException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof ClusterAuthorizationException);
+        }
+
+        // cluster authorization is a fatal error for the producer
+        assertSendFailure(ClusterAuthorizationException.class);
     }
 
     @Test
@@ -402,20 +421,8 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        Sender sender = new Sender(client,
-                metadata,
-                this.accumulator,
-                true,
-                MAX_REQUEST_SIZE,
-                ACKS_ALL,
-                maxRetries,
-                m,
-                time,
-                REQUEST_TIMEOUT,
-                50,
-                transactionManager,
-                apiVersions
-        );
+        Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -446,7 +453,7 @@ public class SenderTest {
     }
 
     @Test
-    public void testAbortRetryWhenPidChanges() throws InterruptedException {
+    public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
@@ -455,20 +462,8 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        Sender sender = new Sender(client,
-                metadata,
-                this.accumulator,
-                true,
-                MAX_REQUEST_SIZE,
-                ACKS_ALL,
-                maxRetries,
-                m,
-                time,
-                REQUEST_TIMEOUT,
-                50,
-                transactionManager,
-                apiVersions
-        );
+        Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // connect.
@@ -504,20 +499,8 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        Sender sender = new Sender(client,
-                metadata,
-                this.accumulator,
-                true,
-                MAX_REQUEST_SIZE,
-                ACKS_ALL,
-                maxRetries,
-                m,
-                time,
-                REQUEST_TIMEOUT,
-                50,
-                transactionManager,
-                apiVersions
-        );
+        Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // connect.
@@ -642,20 +625,38 @@ public class SenderTest {
         metricTags.put("client-id", CLIENT_ID);
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         this.metrics = new Metrics(metricConfig, time);
-        this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
-        this.sender = new Sender(this.client,
-                this.metadata,
-                this.accumulator,
-                true,
-                MAX_REQUEST_SIZE,
-                ACKS_ALL,
-                MAX_RETRIES,
-                this.metrics,
-                this.time,
-                REQUEST_TIMEOUT,
-                50,
-                transactionManager,
-                apiVersions);
+        this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
+                apiVersions, transactionManager);
+        this.sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
+                MAX_RETRIES, this.metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
     }
+
+    private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
+        Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
+                null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertTrue(future.isDone());
+        try {
+            future.get();
+            fail("Future should have raised " + expectedError.getSimpleName());
+        } catch (ExecutionException e) {
+            assertTrue(expectedError.isAssignableFrom(e.getCause().getClass()));
+        }
+    }
+
+    private void prepareAndReceiveInitProducerId(long producerId, Errors error) {
+        short producerEpoch = 0;
+        if (error != Errors.NONE)
+            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
+
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                return body instanceof InitProducerIdRequest && ((InitProducerIdRequest) body).transactionalId() == null;
+            }
+        }, new InitProducerIdResponse(0, error, producerId, producerEpoch));
+        sender.run(time.milliseconds());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index fcf0488..e9363d0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -20,12 +20,14 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -43,6 +45,7 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
 import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
@@ -65,6 +68,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -118,6 +123,7 @@ public class TransactionManagerTest {
                 transactionManager,
                 apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.setNode(brokerNode);
     }
 
     @Test(expected = IllegalStateException.class)
@@ -134,7 +140,6 @@ public class TransactionManagerTest {
         assertEquals((int) transactionManager.sequenceNumber(tp0), 3);
     }
 
-
     @Test
     public void testProducerIdReset() {
         TransactionManager transactionManager = new TransactionManager();
@@ -147,23 +152,13 @@ public class TransactionManagerTest {
 
     @Test
     public void testBasicTransaction() throws InterruptedException {
-        client.setNode(brokerNode);
         // This is called from the initTransactions method in the producer as the first order of business.
         // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        doInitTransactions(pid, epoch);
 
-        sender.run(time.milliseconds());  // get pid.
-
-        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
@@ -199,13 +194,13 @@ public class TransactionManagerTest {
         Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
         txnOffsetCommitResponse.put(tp1, Errors.NONE);
 
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
 
-        assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
+        assertEquals(null, transactionManager.coordinator(CoordinatorType.GROUP));
         sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
         sender.run(time.milliseconds());  // send find coordinator for group request
-        assertNotNull(transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
+        assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
         assertTrue(transactionManager.hasPendingOffsetCommits());
 
         sender.run(time.milliseconds());  // send TxnOffsetCommitRequest commit.
@@ -224,42 +219,40 @@ public class TransactionManagerTest {
 
     @Test
     public void testDisconnectAndRetry() {
-        client.setNode(brokerNode);
         // This is called from the initTransactions method in the producer as the first order of business.
         // It finds the coordinator and then gets a PID.
         transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, true, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, true, CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());  // find coordinator, connection lost.
 
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());  // find coordinator
         sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
     }
 
     @Test
     public void testCoordinatorLost() {
-        client.setNode(brokerNode);
         // This is called from the initTransactions method in the producer as the first order of business.
         // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());  // find coordinator
         sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NOT_COORDINATOR, false, pid, epoch);
         sender.run(time.milliseconds());  // send pid, get not coordinator. Should resend the FindCoordinator and InitPid requests
 
-        assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+        assertEquals(null, transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
         sender.run(time.milliseconds());  // get pid and epoch
@@ -271,24 +264,216 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
+    public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
+        TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false,
+                CoordinatorType.TRANSACTION, transactionalId);
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
+
+        sender.run(time.milliseconds()); // one more run to fail the InitProducerId future
+        assertTrue(initPidResult.isCompleted());
+        assertFalse(initPidResult.isSuccessful());
+        assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException);
+
+        assertFatalError(TransactionalIdAuthorizationException.class);
+    }
+
+    @Test
+    public void testTransactionalIdAuthorizationFailureInInitProducerId() {
+        final long pid = 13131L;
+        TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, pid, RecordBatch.NO_PRODUCER_EPOCH);
+        sender.run(time.milliseconds());
+
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(initPidResult.isCompleted());
+        assertFalse(initPidResult.isSuccessful());
+        assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException);
+
+        assertFatalError(TransactionalIdAuthorizationException.class);
+    }
+
+    @Test
+    public void testGroupAuthorizationFailureInFindCoordinator() {
+        final String consumerGroupId = "consumer";
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
-        sender.run(time.milliseconds());  // find coordinator
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
+                singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), consumerGroupId);
+
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
+        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
+        sender.run(time.milliseconds());  // FindCoordinator Enqueued
+
+        prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId);
+        sender.run(time.milliseconds());  // FindCoordinator Failed
+        sender.run(time.milliseconds());  // TxnOffsetCommit Aborted
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
+        assertTrue(sendOffsetsResult.isCompleted());
+        assertFalse(sendOffsetsResult.isSuccessful());
+        assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException);
+
+        GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error();
+        assertEquals(consumerGroupId, exception.groupId());
+
+        assertAbortableError(GroupAuthorizationException.class);
+    }
+
+    @Test
+    public void testGroupAuthorizationFailureInTxnOffsetCommit() {
+        final String consumerGroupId = "consumer";
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition tp = new TopicPartition("foo", 0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
+                singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
+
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
+        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
+        sender.run(time.milliseconds());  // FindCoordinator Enqueued
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
+        sender.run(time.milliseconds());  // FindCoordinator Returned
+
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.GROUP_AUTHORIZATION_FAILED));
+        sender.run(time.milliseconds());  // TxnOffsetCommit Handled
+
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
+        assertTrue(sendOffsetsResult.isCompleted());
+        assertFalse(sendOffsetsResult.isSuccessful());
+        assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException);
+
+        GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error();
+        assertEquals(consumerGroupId, exception.groupId());
+
+        assertAbortableError(GroupAuthorizationException.class);
+    }
+
+    @Test
+    public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() {
+        final String consumerGroupId = "consumer";
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition tp = new TopicPartition("foo", 0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
+                singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
+
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
+        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled
+
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
+        assertTrue(sendOffsetsResult.isCompleted());
+        assertFalse(sendOffsetsResult.isSuccessful());
+        assertTrue(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException);
+
+        assertFatalError(TransactionalIdAuthorizationException.class);
+    }
+
+    @Test
+    public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() {
+        final String consumerGroupId = "consumer";
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition tp = new TopicPartition("foo", 0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
+                singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
+
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
+        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
+        sender.run(time.milliseconds());  // FindCoordinator Enqueued
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
+        sender.run(time.milliseconds());  // FindCoordinator Returned
+
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
+        sender.run(time.milliseconds());  // TxnOffsetCommit Handled
+
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
+        assertTrue(sendOffsetsResult.isCompleted());
+        assertFalse(sendOffsetsResult.isSuccessful());
+        assertTrue(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException);
+
+        assertFatalError(TransactionalIdAuthorizationException.class);
+    }
+
+    @Test
+    public void testTopicAuthorizationFailureInAddPartitions() {
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition tp = new TopicPartition("foo", 0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp);
+
+        prepareAddPartitionsToTxn(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
         sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
 
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException);
 
-        sender.run(time.milliseconds());  // get pid.
+        TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError();
+        assertEquals(singleton(tp.topic()), exception.unauthorizedTopics());
 
-        assertTrue(transactionManager.hasProducerId());
+        assertAbortableError(TopicAuthorizationException.class);
+    }
+
+    @Test
+    public void testTransactionalIdAuthorizationFailureInAddPartitions() {
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition tp = new TopicPartition("foo", 0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp);
+
+        prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
+        sender.run(time.milliseconds());
+
+        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
+
+        assertFatalError(TransactionalIdAuthorizationException.class);
+    }
+
+    @Test
+    public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
 
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
@@ -329,23 +514,11 @@ public class TransactionManagerTest {
 
     @Test
     public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
-
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
 
-        sender.run(time.milliseconds());  // get pid.
+        doInitTransactions(pid, epoch);
 
-        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         // User does one producer.sed
         transactionManager.maybeAddPartitionToTransaction(tp0);
@@ -392,28 +565,16 @@ public class TransactionManagerTest {
 
     @Test(expected = ExecutionException.class)
     public void testProducerFencedException() throws InterruptedException, ExecutionException {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        doInitTransactions(pid, epoch);
 
-        sender.run(time.milliseconds());  // get pid.
-
-        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -429,28 +590,16 @@ public class TransactionManagerTest {
 
     @Test
     public void testDisallowCommitOnProduceFailure() throws InterruptedException {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        doInitTransactions(pid, epoch);
 
-        sender.run(time.milliseconds());  // get pid.
-
-        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
         assertFalse(responseFuture.isDone());
@@ -483,28 +632,16 @@ public class TransactionManagerTest {
 
     @Test
     public void testAllowAbortOnProduceFailure() throws InterruptedException {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
-
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
-        sender.run(time.milliseconds());  // get pid.
 
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-        assertTrue(transactionManager.hasProducerId());
+        doInitTransactions(pid, epoch);
 
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
         assertFalse(responseFuture.isDone());
@@ -524,28 +661,16 @@ public class TransactionManagerTest {
 
     @Test
     public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
-
-        sender.run(time.milliseconds());  // get pid.
+        doInitTransactions(pid, epoch);
 
-        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid);
@@ -564,23 +689,11 @@ public class TransactionManagerTest {
 
     @Test
     public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws InterruptedException {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        doInitTransactions(pid, epoch);
 
-        sender.run(time.milliseconds());  // get pid.
-
-        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
@@ -597,13 +710,13 @@ public class TransactionManagerTest {
         Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
         txnOffsetCommitResponse.put(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
 
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
 
-        assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
+        assertEquals(null, transactionManager.coordinator(CoordinatorType.GROUP));
         sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
         sender.run(time.milliseconds());  // send find coordinator for group request
-        assertNotNull(transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
+        assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
         assertTrue(transactionManager.hasPendingOffsetCommits());
 
         sender.run(time.milliseconds());  // send TxnOffsetCommitRequest request.
@@ -625,58 +738,35 @@ public class TransactionManagerTest {
     }
 
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
-        client.setNode(brokerNode);
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
-
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-
         final long pid = 1L;
         final short epoch = 1;
 
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        doInitTransactions(pid, epoch);
 
-        sender.run(time.milliseconds());  // get pid.
-
-        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                                                                   "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
         assertFalse(responseFuture.isDone());
-        prepareAddPartitionsToTxnPartitionErrorResponse(tp0, error);
+        prepareAddPartitionsToTxn(tp0, error);
         sender.run(time.milliseconds());  // attempt send addPartitions.
         assertTrue(transactionManager.isInErrorState());
         assertFalse(transactionManager.transactionContainsPartition(tp0));
     }
 
-    private void prepareAddPartitionsToTxnPartitionErrorResponse(final TopicPartition tp0, final Errors error) {
+    private void prepareAddPartitionsToTxn(final TopicPartition tp, final Errors error) {
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
-                assertTrue(body instanceof AddPartitionsToTxnRequest);
-                return true;
-            }
-        }, new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, error)));
-    }
-
-    private static class MockCallback implements Callback {
-        private final TransactionManager transactionManager;
-        public MockCallback(TransactionManager transactionManager) {
-            this.transactionManager = transactionManager;
-        }
-        @Override
-        public void onCompletion(RecordMetadata metadata, Exception exception) {
-            if (exception != null && transactionManager != null) {
-                transactionManager.setError(exception);
+                return body instanceof AddPartitionsToTxnRequest &&
+                        ((AddPartitionsToTxnRequest) body).partitions().contains(tp);
             }
-        }
+        }, new AddPartitionsToTxnResponse(0, singletonMap(tp, error)));
     }
 
     private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconnect,
-                                                final FindCoordinatorRequest.CoordinatorType coordinatorType,
+                                                final CoordinatorType coordinatorType,
                                                 final String coordinatorKey) {
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
@@ -733,7 +823,7 @@ public class TransactionManagerTest {
                 assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
                 return true;
             }
-        }, new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error)));
+        }, new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
     }
 
     private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
@@ -782,9 +872,54 @@ public class TransactionManagerTest {
 
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
         ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP);
-        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
+        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }
 
+    private void doInitTransactions(long pid, short epoch) {
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        sender.run(time.milliseconds());  // get pid.
+        assertTrue(transactionManager.hasProducerId());
+    }
 
+    private void assertAbortableError(Class<? extends RuntimeException> cause) {
+        try {
+            transactionManager.beginTransaction();
+            fail("Should have raised " + cause.getSimpleName());
+        } catch (KafkaException e) {
+            assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
+            assertTrue(transactionManager.isInErrorState());
+        }
+
+        assertTrue(transactionManager.isInErrorState());
+        transactionManager.beginAbortingTransaction();
+        assertFalse(transactionManager.isInErrorState());
+    }
+
+    private void assertFatalError(Class<? extends RuntimeException> cause) {
+        assertTrue(transactionManager.isInErrorState());
+
+        try {
+            transactionManager.beginAbortingTransaction();
+            fail("Should have raised " + cause.getSimpleName());
+        } catch (KafkaException e) {
+            assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
+            assertTrue(transactionManager.isInErrorState());
+        }
+
+        // Transaction abort cannot clear fatal error state
+        try {
+            transactionManager.beginAbortingTransaction();
+            fail("Should have raised " + cause.getSimpleName());
+        } catch (KafkaException e) {
+            assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
+            assertTrue(transactionManager.isInErrorState());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9142c90..2e9a688 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -115,10 +115,10 @@ public class RequestResponseTest {
         checkErrorResponse(createListOffsetRequest(2), new UnknownServerException());
         checkResponse(createListOffsetResponse(2), 2);
         checkRequest(MetadataRequest.Builder.allTopics().build((short) 2));
-        checkRequest(createMetadataRequest(1, asList("topic1")));
-        checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException());
+        checkRequest(createMetadataRequest(1, singletonList("topic1")));
+        checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException());
         checkResponse(createMetadataResponse(), 2);
-        checkErrorResponse(createMetadataRequest(2, asList("topic1")), new UnknownServerException());
+        checkErrorResponse(createMetadataRequest(2, singletonList("topic1")), new UnknownServerException());
         checkRequest(createOffsetCommitRequest(2));
         checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
         checkResponse(createOffsetCommitResponse(), 0);
@@ -183,7 +183,7 @@ public class RequestResponseTest {
         checkOlderFetchVersions();
         checkResponse(createMetadataResponse(), 0);
         checkResponse(createMetadataResponse(), 1);
-        checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException());
+        checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException());
         checkRequest(createOffsetCommitRequest(0));
         checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException());
         checkRequest(createOffsetCommitRequest(1));
@@ -984,7 +984,7 @@ public class RequestResponseTest {
         final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsets = new HashMap<>();
         offsets.put(new TopicPartition("topic", 73),
                     new TxnOffsetCommitRequest.CommittedOffset(100, null));
-        return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, offsets).build();
+        return new TxnOffsetCommitRequest.Builder("transactionalId", "groupId", 21L, (short) 42, offsets).build();
     }
 
     private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 925c407..e02b5dc 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -34,7 +34,8 @@ object AclCommand {
     Broker -> Set(DescribeConfigs),
     Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
     Group -> Set(Read, Describe, All),
-    Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, All)
+    Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, All),
+    TransactionalId -> Set(Describe, Write, All)
   )
 
   def main(args: Array[String]) {
@@ -88,7 +89,7 @@ object AclCommand {
         CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
 
       for ((resource, acls) <- resourceToAcl) {
-        println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+        println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
         authorizer.addAcls(acls, resource)
       }
 
@@ -102,10 +103,10 @@ object AclCommand {
 
       for ((resource, acls) <- resourceToAcl) {
         if (acls.isEmpty) {
-          if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `${resource}`? (y/n)"))
+          if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `$resource`? (y/n)"))
             authorizer.removeAcls(resource)
         } else {
-          if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `${resource}`? (y/n)"))
+          if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `$resource`? (y/n)"))
             authorizer.removeAcls(acls, resource)
         }
       }
@@ -123,7 +124,7 @@ object AclCommand {
         else resources.map(resource => resource -> authorizer.getAcls(resource))
 
       for ((resource, acls) <- resourceToAcls)
-        println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+        println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
     }
   }
 
@@ -149,12 +150,16 @@ object AclCommand {
 
   private def getProducerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
     val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
+    val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId)
+    val enableIdempotence = opts.options.has(opts.idempotentOpt)
 
     val acls = getAcl(opts, Set(Write, Describe))
 
-    //Write, Describe permission on topics, Create permission on cluster
-    topics.map(_ -> acls).toMap[Resource, Set[Acl]] +
-      (Resource.ClusterResource -> getAcl(opts, Set(Create)))
+    //Write, Describe permission on topics, Create permission on cluster, Write, Describe on transactionalIds
+    topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
+      transactionalIds.map(_ -> acls).toMap[Resource, Set[Acl]] +
+      (Resource.ClusterResource -> (getAcl(opts, Set(Create)) ++
+        (if (enableIdempotence) getAcl(opts, Set(IdempotentWrite)) else Set.empty[Acl])))
   }
 
   private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
@@ -184,7 +189,7 @@ object AclCommand {
 
     val allowedHosts = getHosts(opts, opts.allowHostsOpt, opts.allowPrincipalsOpt)
 
-    val deniedHosts = getHosts(opts, opts.denyHostssOpt, opts.denyPrincipalsOpt)
+    val deniedHosts = getHosts(opts, opts.denyHostsOpt, opts.denyPrincipalsOpt)
 
     val acls = new collection.mutable.HashSet[Acl]
     if (allowedHosts.nonEmpty && allowedPrincipals.nonEmpty)
@@ -232,7 +237,7 @@ object AclCommand {
     if (opts.options.has(opts.topicOpt))
       opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resources += new Resource(Topic, topic.trim))
 
-    if (opts.options.has(opts.clusterOpt))
+    if (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt))
       resources += Resource.ClusterResource
 
     if (opts.options.has(opts.groupOpt))
@@ -241,6 +246,10 @@ object AclCommand {
     if (opts.options.has(opts.brokerOpt))
       opts.options.valuesOf(opts.brokerOpt).asScala.foreach(broker => resources += new Resource(Broker, broker.toString))
 
+    if (opts.options.has(opts.transactionalIdOpt))
+      opts.options.valuesOf(opts.transactionalIdOpt).asScala.foreach(transactionalId =>
+        resources += new Resource(TransactionalId, transactionalId))
+
     if (resources.isEmpty && dieIfNoResourceFound)
       CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>")
 
@@ -295,6 +304,16 @@ object AclCommand {
       .describedAs("broker")
       .ofType(classOf[Int])
 
+    val transactionalIdOpt = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
+      "be added or removed. A value of * indicates the ACLs should apply to all transactionalIds.")
+      .withRequiredArg
+      .describedAs("transactional-id")
+      .ofType(classOf[String])
+
+    val idempotentOpt = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
+      "used in combination with the --producer option. Note that idempotence is enabled automatically if " +
+      "the producer is authorized to a particular transactional-id.")
+
     val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
     val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
     val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
@@ -329,7 +348,7 @@ object AclCommand {
       .describedAs("allow-host")
       .ofType(classOf[String])
 
-    val denyHostssOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
+    val denyHostsOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
       "If you have specified --deny-principal then the default for this option will be set to * which denies access from all hosts.")
       .withRequiredArg
       .describedAs("deny-host")
@@ -354,17 +373,20 @@ object AclCommand {
       if (actions != 1)
         CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --list, --add, --remove. ")
 
-      CommandLineUtils.checkInvalidArgs(parser, options, listOpt, Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostssOpt, denyPrincipalsOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, listOpt, Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt))
 
       //when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts.
-      CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostssOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostssOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
 
       if (options.has(producerOpt) && !options.has(topicOpt))
         CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")
 
-      if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && options.has(clusterOpt))))
-        CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster option should be specified.")
+      if (options.has(idempotentOpt) && !options.has(producerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The --idempotent option is only available if --producer is set")
+
+      if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && (options.has(clusterOpt) || options.has(transactionalIdOpt)))))
+        CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 302fcb5..7bde4e2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -323,6 +323,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   }
 
   def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) {
+    trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $offsets is pending")
     receivedTransactionalOffsetCommits = true
     val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
       mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
@@ -339,10 +340,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   /* Remove a pending transactional offset commit if the actual offset commit record was not written to the log.
    * We will return an error and the client will retry the request, potentially to a different coordinator.
    */
-  def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition, offsetAndMetadata: OffsetAndMetadata): Unit = {
+  def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition): Unit = {
     pendingTransactionalOffsetCommits.get(producerId) match {
       case Some(pendingOffsets) =>
-        pendingOffsets.remove(topicPartition)
+        val pendingOffsetCommit = pendingOffsets.remove(topicPartition)
+        trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetCommit failed " +
+          s"to be appended to the log")
         if (pendingOffsets.isEmpty)
           pendingTransactionalOffsetCommits.remove(producerId)
       case _ =>
@@ -366,18 +369,28 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
    * to the log.
    */
   def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = {
-    trace(s"Completing transactional offset commit for producer $producerId and group $groupId. isCommit: $isCommit")
+    val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId)
     if (isCommit) {
-      val producerOffsets = pendingTransactionalOffsetCommits.getOrElse(producerId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
-      producerOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
-        if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
-          throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
-            s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.")
-        if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(commitRecordMetadataAndOffset))
-          offsets.put(topicPartition, commitRecordMetadataAndOffset)
+      pendingOffsetsOpt.foreach { pendingOffsets =>
+        pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
+          if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
+            throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
+              s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.")
+
+          val currentOffsetOpt = offsets.get(topicPartition)
+          if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {
+            trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +
+              "committed and loaded into the cache.")
+            offsets.put(topicPartition, commitRecordMetadataAndOffset)
+          } else {
+            trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +
+              s"committed, but not loaded since its offset is older than current offset $currentOffsetOpt.")
+          }
+        }
       }
+    } else {
+      trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetsOpt aborted")
     }
-    pendingTransactionalOffsetCommits.remove(producerId)
   }
 
   def activeProducers = pendingTransactionalOffsetCommits.keySet
@@ -430,7 +443,13 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   }
 
   override def toString: String = {
-    "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
+    "GroupMetadata(" +
+      s"groupId=$groupId, " +
+      s"generation=$generationId, " +
+      s"protocolType=$protocolType, " +
+      s"currentState=$currentState, " +
+      s"members=$members)"
   }
+
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index a7eb28b..8e5135d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -324,7 +324,7 @@ class GroupMetadataManager(brokerId: Int,
                     removeProducerGroup(producerId, group.groupId)
                   filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
                     if (isTxnOffsetCommit)
-                      group.failPendingTxnOffsetCommit(producerId, topicPartition, offsetAndMetadata)
+                      group.failPendingTxnOffsetCommit(producerId, topicPartition)
                     else
                       group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
                   }
@@ -536,7 +536,6 @@ class GroupMetadataManager(brokerId: Int,
                     val groupId = groupMetadataKey.key
                     val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
                     if (groupMetadata != null) {
-                      trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
                       removedGroups.remove(groupId)
                       loadedGroups.put(groupId, groupMetadata)
                     } else {
@@ -577,6 +576,7 @@ class GroupMetadataManager(brokerId: Int,
           loadedGroups.values.foreach { group =>
             val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
             val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+            trace(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
             loadGroup(group, offsets, pendingOffsets)
             onGroupLoaded(group)
           }
@@ -587,6 +587,7 @@ class GroupMetadataManager(brokerId: Int,
             val group = new GroupMetadata(groupId)
             val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
             val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+            trace(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
             loadGroup(group, offsets, pendingOffsets)
             onGroupLoaded(group)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index ce542b1..b082b9b 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -85,7 +85,7 @@ private[group] class MemberMetadata(val memberId: String,
     if (protocols.size != this.supportedProtocols.size)
       return false
 
-    for (i <- 0 until protocols.size) {
+    for (i <- protocols.indices) {
       val p1 = protocols(i)
       val p2 = supportedProtocols(i)
       if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
@@ -114,7 +114,15 @@ private[group] class MemberMetadata(val memberId: String,
     }
   }
 
-  override def toString = {
-    "[%s,%s,%s,%d]".format(memberId, clientId, clientHost, sessionTimeoutMs)
+  override def toString: String = {
+    "MemberMetadata(" +
+      s"memberId=$memberId, " +
+      s"clientId=$clientId, " +
+      s"clientHost=$clientHost, " +
+      s"sessionTimeoutMs=$sessionTimeoutMs, " +
+      s"rebalanceTimeoutMs=$rebalanceTimeoutMs, " +
+      s"supportedProtocols=${supportedProtocols.map(_._1)}, " +
+      ")"
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
index f65d9f0..420c3eb 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -65,6 +65,10 @@ case object AlterConfigs extends Operation {
   val name = "AlterConfigs"
   val toJava = AclOperation.ALTER_CONFIGS
 }
+case object IdempotentWrite extends Operation {
+  val name = "IdempotentWrite"
+  val toJava = AclOperation.IDEMPOTENT_WRITE
+}
 case object All extends Operation {
   val name = "All"
   val toJava = AclOperation.ALL
@@ -86,5 +90,5 @@ object Operation {
   }
 
   def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs,
-     DescribeConfigs, All)
+     DescribeConfigs, IdempotentWrite, All)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/security/auth/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index a0ed9f9..311f5b5 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -21,7 +21,6 @@ object Resource {
   val ClusterResourceName = "kafka-cluster"
   val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName)
   val ProducerIdResourceName = "producer-id"
-  val ProducerIdResource = new Resource(Cluster, Resource.ProducerIdResourceName)
   val WildCardResource = "*"
 
   def fromString(str: String): Resource = {
@@ -38,7 +37,7 @@ object Resource {
  * @param name name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
  *             it will be a constant string kafka-cluster.
  */
-case class Resource(val resourceType: ResourceType, val name: String) {
+case class Resource(resourceType: ResourceType, name: String) {
 
   override def toString: String = {
     resourceType.name + Resource.Separator + name

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index ea7ce3c..9cfe1cd 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -19,11 +19,6 @@ package kafka.security.auth
 import kafka.common.{BaseEnum, KafkaException}
 import org.apache.kafka.common.protocol.Errors
 
-/**
- * ResourceTypes.
- */
-
-
 sealed trait ResourceType extends BaseEnum { def error: Errors }
 
 case object Cluster extends ResourceType {
@@ -46,16 +41,11 @@ case object Group extends ResourceType {
   val error = Errors.GROUP_AUTHORIZATION_FAILED
 }
 
-case object ProducerTransactionalId extends ResourceType {
-  val name = "ProducerTransactionalId"
+case object TransactionalId extends ResourceType {
+  val name = "TransactionalId"
   val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
 }
 
-case object ProducerIdResource extends ResourceType {
-  val name = "ProducerIdResource"
-  val error = Errors.PRODUCER_ID_AUTHORIZATION_FAILED
-}
-
 object ResourceType {
 
   def fromString(resourceType: String): ResourceType = {
@@ -63,5 +53,5 @@ object ResourceType {
     rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
   }
 
-  def values: Seq[ResourceType] = List(Cluster, Topic, Group, ProducerTransactionalId, ProducerIdResource, Broker)
+  def values: Seq[ResourceType] = List(Cluster, Topic, Group, TransactionalId, Broker)
 }


Mime
View raw message