kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5947; Handle authentication failure in admin client, txn producer
Date Thu, 21 Sep 2017 12:59:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f29391c49 -> 96ba21e0d


KAFKA-5947; Handle authentication failure in admin client, txn producer

1. Raise AuthenticationException for authentication failures in admin client
2. Handle AuthenticationException as a fatal error for transactional producer
3. Add comments to authentication exceptions

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3928 from rajinisivaram/KAFKA-5947-auth-failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96ba21e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96ba21e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96ba21e0

Branch: refs/heads/trunk
Commit: 96ba21e0dfb1a564d5349179d844f020abf1e08b
Parents: f29391c
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Thu Sep 21 13:58:35 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Sep 21 13:58:43 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  1 +
 .../java/org/apache/kafka/clients/Metadata.java | 13 ++-
 .../kafka/clients/NetworkClientUtils.java       |  4 +-
 .../kafka/clients/admin/KafkaAdminClient.java   | 33 +++++++
 .../internals/ConsumerNetworkClient.java        |  4 +-
 .../clients/producer/internals/Sender.java      | 59 +++++++-----
 .../producer/internals/TransactionManager.java  |  6 ++
 .../common/errors/AuthenticationException.java  | 13 +++
 .../errors/AuthenticationFailedException.java   | 31 ------
 .../errors/IllegalSaslStateException.java       |  5 +
 .../errors/SaslAuthenticationException.java     | 40 ++++++++
 .../UnsupportedSaslMechanismException.java      |  4 +
 .../kafka/common/network/KafkaChannel.java      |  2 +-
 .../apache/kafka/common/protocol/Errors.java    |  6 +-
 .../requests/SaslAuthenticateResponse.java      |  2 +-
 .../authenticator/SaslClientAuthenticator.java  |  3 +-
 .../authenticator/SaslServerAuthenticator.java  |  2 +-
 .../ClientAuthenticationFailureTest.java        | 60 +++++++++---
 .../SaslClientsWithInvalidCredentialsTest.scala | 99 ++++++++++++++++----
 19 files changed, 286 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3329b2d..f4d9655 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -41,6 +41,7 @@
   <!-- anyone can use public classes -->
   <allow pkg="org.apache.kafka.common" exact-match="true" />
   <allow pkg="org.apache.kafka.common.security" />
+  <allow pkg="org.apache.kafka.common.serialization" />
   <allow pkg="org.apache.kafka.common.utils" />
   <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
   <allow pkg="org.apache.kafka.common.memory" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 5790d88..3b8c18a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -149,14 +149,15 @@ public final class Metadata {
 
     /**
      * If any non-retriable authentication exceptions were encountered during
-     * metadata update, clear and throw the exception.
+     * metadata update, clear and return the exception.
      */
-    public synchronized void maybeThrowAuthenticationException() {
+    public synchronized AuthenticationException getAndClearAuthenticationException() {
         if (authenticationException != null) {
             AuthenticationException exception = authenticationException;
             authenticationException = null;
-            throw exception;
-        }
+            return exception;
+        } else
+            return null;
     }
 
     /**
@@ -169,7 +170,9 @@ public final class Metadata {
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
         while (this.version <= lastVersion) {
-            maybeThrowAuthenticationException();
+            AuthenticationException ex = getAndClearAuthenticationException();
+            if (ex != null)
+                throw ex;
             if (remainingWaitMs != 0)
                 wait(remainingWaitMs);
             long elapsed = System.currentTimeMillis() - begin;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
index 8462979..c4559a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -47,7 +47,7 @@ public class NetworkClientUtils {
      * It returns `true` if the call completes normally or `false` if the timeoutMs expires.
If the connection fails,
      * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured
with a positive
      * connection timeoutMs, it is possible for this method to raise an `IOException` for
a previous connection which
-     * has recently disconnected.
+     * has recently disconnected. If authentication to the node fails, an `AuthenticationException`
is thrown.
      *
      * This method is useful for implementing blocking behaviour on top of the non-blocking
`NetworkClient`, use it with
      * care.
@@ -69,6 +69,8 @@ public class NetworkClientUtils {
             }
             long pollTimeout = expiryTime - attemptStartTime;
             client.poll(pollTimeout, attemptStartTime);
+            if (client.authenticationException(node) != null)
+                throw client.authenticationException(node);
             attemptStartTime = time.milliseconds();
         }
         return client.isReady(node, attemptStartTime);

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d857ae3..fe92b15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -850,6 +851,37 @@ public class KafkaAdminClient extends AdminClient {
         }
 
         /**
+         * If an authentication exception is encountered with connection to any broker,
+         * fail all pending requests.
+         */
+        private void handleAuthenticationException(long now, Map<Node, List<Call>>
callsToSend) {
+            AuthenticationException authenticationException = metadata.getAndClearAuthenticationException();
+            if (authenticationException == null) {
+                for (Node node : callsToSend.keySet()) {
+                    authenticationException = client.authenticationException(node);
+                    if (authenticationException != null)
+                        break;
+                }
+            }
+            if (authenticationException != null) {
+                synchronized (this) {
+                    failCalls(now, newCalls, authenticationException);
+                }
+                for (List<Call> calls : callsToSend.values()) {
+                    failCalls(now, calls, authenticationException);
+                }
+                callsToSend.clear();
+            }
+        }
+
+        private void failCalls(long now, List<Call> calls, AuthenticationException
authenticationException) {
+            for (Call call : calls) {
+                call.fail(now, authenticationException);
+            }
+            calls.clear();
+        }
+
+        /**
          * Handle responses from the server.
          *
          * @param now                   The current time in milliseconds.
@@ -976,6 +1008,7 @@ public class KafkaAdminClient extends AdminClient {
 
                 // Update the current time and handle the latest responses.
                 now = time.milliseconds();
+                handleAuthenticationException(now, callsToSend);
                 handleResponses(now, responses, callsInFlight, correlationIdToCalls);
             }
             int numTimedOut = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 43c6358..86fca9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -135,7 +135,9 @@ public class ConsumerNetworkClient implements Closeable {
         int version = this.metadata.requestUpdate();
         do {
             poll(timeout);
-            this.metadata.maybeThrowAuthenticationException();
+            AuthenticationException ex = this.metadata.getAndClearAuthenticationException();
+            if (ex != null)
+                throw ex;
         } while (this.metadata.version() == version && time.milliseconds() - startMs
< timeout);
         return this.metadata.version() > version;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 02b79c5..b96ea92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
@@ -201,32 +202,38 @@ public class Sender implements Runnable {
      */
     void run(long now) {
         if (transactionManager != null) {
-            if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
-                // Check if the previous run expired batches which requires a reset of the
producer state.
-                transactionManager.resetProducerId();
-
-            if (!transactionManager.isTransactional()) {
-                // this is an idempotent producer, so make sure we have a producer id
-                maybeWaitForProducerId();
-            } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError())
{
-                transactionManager.transitionToFatalError(new KafkaException("The client
hasn't received acknowledgment for " +
-                        "some previously sent messages and can no longer retry them. It isn't
safe to continue."));
-            } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now))
{
-                // as long as there are outstanding transactional requests, we simply wait
for them to return
-                client.poll(retryBackoffMs, now);
-                return;
-            }
+            try {
+                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
+                    // Check if the previous run expired batches which requires a reset of
the producer state.
+                    transactionManager.resetProducerId();
+
+                if (!transactionManager.isTransactional()) {
+                    // this is an idempotent producer, so make sure we have a producer id
+                    maybeWaitForProducerId();
+                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError())
{
+                    transactionManager.transitionToFatalError(new KafkaException("The client
hasn't received acknowledgment for " +
+                            "some previously sent messages and can no longer retry them.
It isn't safe to continue."));
+                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now))
{
+                    // as long as there are outstanding transactional requests, we simply
wait for them to return
+                    client.poll(retryBackoffMs, now);
+                    return;
+                }
 
-            // do not continue sending if the transaction manager is in a failed state or
if there
-            // is no producer id (for the idempotent case).
-            if (transactionManager.hasFatalError() || !transactionManager.hasProducerId())
{
-                RuntimeException lastError = transactionManager.lastError();
-                if (lastError != null)
-                    maybeAbortBatches(lastError);
-                client.poll(retryBackoffMs, now);
-                return;
-            } else if (transactionManager.hasAbortableError()) {
-                accumulator.abortUndrainedBatches(transactionManager.lastError());
+                // do not continue sending if the transaction manager is in a failed state
or if there
+                // is no producer id (for the idempotent case).
+                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId())
{
+                    RuntimeException lastError = transactionManager.lastError();
+                    if (lastError != null)
+                        maybeAbortBatches(lastError);
+                    client.poll(retryBackoffMs, now);
+                    return;
+                } else if (transactionManager.hasAbortableError()) {
+                    accumulator.abortUndrainedBatches(transactionManager.lastError());
+                }
+            } catch (AuthenticationException e) {
+                // This is already logged as error, but propagated here to perform any clean
ups.
+                log.trace("Authentication exception while processing transactional request:
{}", e);
+                transactionManager.authenticationFailed(e);
             }
         }
 
@@ -407,7 +414,7 @@ public class Sender implements Runnable {
 
     private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException {
         Node node = client.leastLoadedNode(time.milliseconds());
-        if (NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs)) {
+        if (node != null && NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs))
{
             return node;
         }
         return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 43abdbb..3bddded 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.protocol.Errors;
@@ -615,6 +616,11 @@ public class TransactionManager {
         enqueueRequest(request);
     }
 
+    synchronized void authenticationFailed(AuthenticationException e) {
+        for (TxnRequestHandler request : pendingRequests)
+            request.fatalError(e);
+    }
+
     Node coordinator(FindCoordinatorRequest.CoordinatorType type) {
         switch (type) {
             case GROUP:

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
index aa4a111..c56ac88 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
@@ -16,6 +16,19 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * This exception indicates that SASL authentication has failed.
+ * On authentication failure, clients abort the operation requested and raise one
+ * of the subclasses of this exception:
+ * <ul>
+ *   </li>{@link SaslAuthenticationException} if SASL handshake fails with invalid
credentials
+ *   or any other failure specific to the SASL mechanism used for authentication</li>
+ *   <li>{@link UnsupportedSaslMechanismException} if the SASL mechanism requested
by the client
+ *   is not supported on the broker.</li>
+ *   <li>{@link IllegalSaslStateException} if an unexpected request is received on
during SASL
+ *   handshake. This could be due to misconfigured security protocol.</li>
+ * </ul>
+ */
 public class AuthenticationException extends ApiException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java
b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java
deleted file mode 100644
index 3be72f0..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-public class AuthenticationFailedException extends AuthenticationException {
-
-    private static final long serialVersionUID = 1L;
-
-    public AuthenticationFailedException(String message) {
-        super(message);
-    }
-
-    public AuthenticationFailedException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
index c45f007..691244a 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
@@ -16,6 +16,11 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * This exception indicates unexpected requests prior to SASL authentication.
+ * This could be due to misconfigured security, e.g. if PLAINTEXT protocol
+ * is used to connect to a SASL endpoint.
+ */
 public class IllegalSaslStateException extends AuthenticationException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
new file mode 100644
index 0000000..d128c25
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that SASL authentication has failed. The error message
+ * in the exception indicates the actual cause of failure.
+ * <p>
+ * SASL authentication failures typically indicate invalid credentials, but
+ * could also include other failures specific to the SASL mechanism used
+ * for authentication.
+ * </p>
+ */
+public class SaslAuthenticationException extends AuthenticationException {
+
+    private static final long serialVersionUID = 1L;
+
+    public SaslAuthenticationException(String message) {
+        super(message);
+    }
+
+    public SaslAuthenticationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
index 9dab22a..4db4aee 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * This exception indicates that the SASL mechanism requested by the client
+ * is not enabled on the broker.
+ */
 public class UnsupportedSaslMechanismException extends AuthenticationException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 68f9ed6..24cd9cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -79,7 +79,7 @@ public class KafkaChannel {
                 authenticator.authenticate();
             } catch (AuthenticationException e) {
                 switch (authenticator.error()) {
-                    case AUTHENTICATION_FAILED:
+                    case SASL_AUTHENTICATION_FAILED:
                     case ILLEGAL_SASL_STATE:
                     case UNSUPPORTED_SASL_MECHANISM:
                         state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED,
e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 160ca26..bea6050 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.AuthenticationFailedException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ConcurrentTransactionsException;
@@ -520,11 +520,11 @@ public enum Errors {
                 return new LogDirNotFoundException(message);
             }
     }),
-    AUTHENTICATION_FAILED(58, "Authentication failed.",
+    SASL_AUTHENTICATION_FAILED(58, "SASL Authentication failed.",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {
-                return new AuthenticationFailedException(message);
+                return new SaslAuthenticationException(message);
             }
     }),
     UNKNOWN_PRODUCER_ID(59, "This exception is raised by the broker if it could not locate
the producer metadata " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index 1dd0e76..c950cb9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -51,7 +51,7 @@ public class SaslAuthenticateResponse extends AbstractResponse {
 
     /**
      * Possible error codes:
-     *   AUTHENTICATION_FAILED(57) : Authentication failed
+     *   SASL_AUTHENTICATION_FAILED(57) : Authentication failed
      */
     private final Errors error;
     private final String errorMessage;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 7cbb756..8207a5a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -104,7 +104,8 @@ public class SaslClientAuthenticator implements Authenticator {
     private RequestHeader currentRequestHeader;
     // Version of SaslAuthenticate request/responses
     private short saslAuthenticateVersion;
-    // Sasl authentication error which may be one of NONE, UNSUPPORTED_SASL_MECHANISM, ILLEGAL_SASL_STATE,
AUTHENTICATION_FAILED or NETWORK_EXCEPTION
+    // Sasl authentication error which may be one of NONE, UNSUPPORTED_SASL_MECHANISM, ILLEGAL_SASL_STATE,
+    // SASL_AUTHENTICATION_FAILED or NETWORK_EXCEPTION
     private Errors error;
 
     public SaslClientAuthenticator(Map<String, ?> configs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 46386cf..6202131 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -385,7 +385,7 @@ public class SaslServerAuthenticator implements Authenticator {
                 ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
                 sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE,
null, responseBuf));
             } catch (SaslException e) {
-                this.error = Errors.AUTHENTICATION_FAILED;
+                this.error = Errors.SASL_AUTHENTICATION_FAILED;
                 sendKafkaResponse(requestContext, new SaslAuthenticateResponse(this.error,
                         "Authentication failed due to invalid credentials with SASL mechanism
" + saslMechanism));
                 throw e;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index b4460f7..d878b72 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.security.authenticator;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -24,17 +26,20 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
-import org.apache.kafka.common.errors.AuthenticationFailedException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -73,15 +78,15 @@ public class ClientAuthenticationFailureTest {
 
     @Test
     public void testConsumerWithInvalidCredentials() {
-        saslClientConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
-        saslClientConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        saslClientConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        Map<String, Object> props = new HashMap<>(saslClientConfigs);
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
+        StringDeserializer deserializer = new StringDeserializer();
 
-        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(saslClientConfigs))
{
+        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props,
deserializer, deserializer)) {
             consumer.subscribe(Arrays.asList(topic));
             consumer.poll(100);
             fail("Expected an authentication error!");
-        } catch (AuthenticationFailedException e) {
+        } catch (SaslAuthenticationException e) {
             // OK
         } catch (Exception e) {
             fail("Expected only an authentication error, but another error occurred: " +
e.getMessage());
@@ -90,16 +95,47 @@ public class ClientAuthenticationFailureTest {
 
     @Test
     public void testProducerWithInvalidCredentials() {
-        saslClientConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
-        saslClientConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-        saslClientConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        Map<String, Object> props = new HashMap<>(saslClientConfigs);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
+        StringSerializer serializer = new StringSerializer();
 
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message");
-        try (KafkaProducer<String, String> producer = new KafkaProducer<>(saslClientConfigs))
{
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props,
serializer, serializer)) {
+            ProducerRecord<String, String> record = new ProducerRecord<>(topic,
"message");
             producer.send(record).get();
             fail("Expected an authentication error!");
         } catch (Exception e) {
-            assertTrue("Expected an exception of type AuthenticationFailedException", e.getCause()
instanceof AuthenticationFailedException);
+            assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
+                    e.getCause() instanceof SaslAuthenticationException);
+        }
+    }
+
+    @Test
+    public void testAdminClientWithInvalidCredentials() {
+        Map<String, Object> props = new HashMap<>(saslClientConfigs);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
+        try (AdminClient client = AdminClient.create(props)) {
+            DescribeTopicsResult result = client.describeTopics(Collections.singleton("test"));
+            result.all().get();
+            fail("Expected an authentication error!");
+        } catch (Exception e) {
+            assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
+                    e.getCause() instanceof SaslAuthenticationException);
+        }
+    }
+
+    @Test
+    public void testTransactionalProducerWithInvalidCredentials() throws Exception {
+        Map<String, Object> props = new HashMap<>(saslClientConfigs);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
+        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1");
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+        StringSerializer serializer = new StringSerializer();
+
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props,
serializer, serializer)) {
+            producer.initTransactions();
+            fail("Expected an authentication error!");
+        } catch (SaslAuthenticationException e) {
+            // expected exception
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 52fbdba..8765040 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -13,13 +13,15 @@
 package kafka.api
 
 import java.io.FileOutputStream
+import java.util.Collections
 import java.util.concurrent.{ExecutionException, Future, TimeUnit}
-import scala.collection.JavaConverters.seqAsJavaListConverter
+import scala.collection.JavaConverters._
 
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
 import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.AuthenticationFailedException
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord,
RecordMetadata}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.SaslAuthenticationException
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.junit.{After, Before, Test}
@@ -39,9 +41,13 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
   val producerCount = 1
   val serverCount = 1
 
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.TransactionsTopicMinISRProp, "1")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
 
   val topic = "topic"
+  val numPartitions = 1
   val tp = new TopicPartition(topic, 0)
 
   override def configureSecurityBeforeServersStart() {
@@ -56,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
       JaasTestUtils.KafkaServerContextName))
     super.setUp()
-    TestUtils.createTopic(this.zkUtils, topic, 1, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic, numPartitions, serverCount, this.servers)
   }
 
   @After
@@ -67,10 +73,25 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
 
   @Test
   def testProducerWithAuthenticationFailure() {
-    verifyAuthenticationException(() => sendOneRecord(10000))
+    verifyAuthenticationException(sendOneRecord(10000))
+    verifyAuthenticationException(producers.head.partitionsFor(topic))
 
     createClientCredential()
-    verifyWithRetry(() => sendOneRecord())
+    verifyWithRetry(sendOneRecord())
+  }
+
+  @Test
+  def testTransactionalProducerWithAuthenticationFailure() {
+    val txProducer = createTransactionalProducer()
+    verifyAuthenticationException(txProducer.initTransactions())
+
+    createClientCredential()
+    try {
+      txProducer.initTransactions()
+      fail("Transaction initialization should fail after authentication failure")
+    } catch {
+      case _: KafkaException => // expected exception
+    }
   }
 
   @Test
@@ -99,11 +120,40 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
   }
 
   private def verifyConsumerWithAuthenticationFailure(consumer: KafkaConsumer[Array[Byte],
Array[Byte]]) {
-    verifyAuthenticationException(() => consumer.poll(10000))
+    verifyAuthenticationException(consumer.poll(10000))
+    verifyAuthenticationException(consumer.partitionsFor(topic))
 
     createClientCredential()
-    verifyWithRetry(() => sendOneRecord())
-    verifyWithRetry(() => assertEquals(1, consumer.poll(1000).count))
+    verifyWithRetry(sendOneRecord())
+    verifyWithRetry(assertEquals(1, consumer.poll(1000).count))
+  }
+
+  @Test
+  def testKafkaAdminClientWithAuthenticationFailure() {
+    val props = TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val adminClient = AdminClient.create(props)
+
+    def describeTopic(): Unit = {
+      try {
+        val response = adminClient.describeTopics(Collections.singleton(topic)).all.get
+        assertEquals(1, response.size)
+        response.asScala.foreach { case (topic, description) =>
+          assertEquals(numPartitions, description.partitions.size)
+        }
+      } catch {
+        case e: ExecutionException => throw e.getCause
+      }
+    }
+
+    try {
+      verifyAuthenticationException(describeTopic())
+
+      createClientCredential()
+      verifyWithRetry(describeTopic())
+    } finally {
+      adminClient.close
+    }
   }
 
   @Test
@@ -124,9 +174,9 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
     val consumer = consumers.head
     consumer.subscribe(List(topic).asJava)
 
-    verifyAuthenticationException(() => consumerGroupService.listGroups)
+    verifyAuthenticationException(consumerGroupService.listGroups)
     createClientCredential()
-    verifyWithRetry(() => consumer.poll(1000))
+    verifyWithRetry(consumer.poll(1000))
     assertEquals(1, consumerGroupService.listGroups.size)
   }
 
@@ -147,13 +197,13 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
     }
   }
 
-  private def verifyAuthenticationException(f: () => Unit): Unit = {
+  private def verifyAuthenticationException(action: => Unit): Unit = {
     val startMs = System.currentTimeMillis
     try {
-      f()
+      action
       fail("Expected an authentication exception")
     } catch {
-      case e: AuthenticationFailedException =>
+      case e: SaslAuthenticationException =>
         // expected exception
         val elapsedMs = System.currentTimeMillis - startMs
         assertTrue(s"Poll took too long, elapsed=$elapsedMs", elapsedMs <= 5000)
@@ -161,16 +211,29 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
     }
   }
 
-  private def verifyWithRetry(f: () => Unit): Unit = {
+  private def verifyWithRetry(action: => Unit): Unit = {
     var attempts = 0
     TestUtils.waitUntilTrue(() => {
       try {
         attempts += 1
-        f()
+        action
         true
       } catch {
-        case _: AuthenticationFailedException => false
+        case _: SaslAuthenticationException => false
       }
     }, s"Operation did not succeed within timeout after $attempts")
   }
+
+  private def createTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
+    producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1")
+    producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    val txProducer = TestUtils.createNewProducer(brokerList,
+                                  securityProtocol = this.securityProtocol,
+                                  saslProperties = this.clientSaslProperties,
+                                  retries = 1000,
+                                  acks = -1,
+                                  props = Some(producerConfig))
+    producers += txProducer
+    txProducer
+  }
 }


Mime
View raw message