Repository: kafka Updated Branches: refs/heads/trunk f29391c49 -> 96ba21e0d KAFKA-5947; Handle authentication failure in admin client, txn producer 1. Raise AuthenticationException for authentication failures in admin client 2. Handle AuthenticationException as a fatal error for transactional producer 3. Add comments to authentication exceptions Author: Rajini Sivaram Reviewers: Vahid Hashemian , Ismael Juma Closes #3928 from rajinisivaram/KAFKA-5947-auth-failure Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96ba21e0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96ba21e0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96ba21e0 Branch: refs/heads/trunk Commit: 96ba21e0dfb1a564d5349179d844f020abf1e08b Parents: f29391c Author: Rajini Sivaram Authored: Thu Sep 21 13:58:35 2017 +0100 Committer: Ismael Juma Committed: Thu Sep 21 13:58:43 2017 +0100 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 + .../java/org/apache/kafka/clients/Metadata.java | 13 ++- .../kafka/clients/NetworkClientUtils.java | 4 +- .../kafka/clients/admin/KafkaAdminClient.java | 33 +++++++ .../internals/ConsumerNetworkClient.java | 4 +- .../clients/producer/internals/Sender.java | 59 +++++++----- .../producer/internals/TransactionManager.java | 6 ++ .../common/errors/AuthenticationException.java | 13 +++ .../errors/AuthenticationFailedException.java | 31 ------ .../errors/IllegalSaslStateException.java | 5 + .../errors/SaslAuthenticationException.java | 40 ++++++++ .../UnsupportedSaslMechanismException.java | 4 + .../kafka/common/network/KafkaChannel.java | 2 +- .../apache/kafka/common/protocol/Errors.java | 6 +- .../requests/SaslAuthenticateResponse.java | 2 +- .../authenticator/SaslClientAuthenticator.java | 3 +- .../authenticator/SaslServerAuthenticator.java | 2 +- .../ClientAuthenticationFailureTest.java | 60 +++++++++--- .../SaslClientsWithInvalidCredentialsTest.scala | 99 ++++++++++++++++---- 19 files changed, 286 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 3329b2d..f4d9655 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -41,6 +41,7 @@ + http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 5790d88..3b8c18a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -149,14 +149,15 @@ public final class Metadata { /** * If any non-retriable authentication exceptions were encountered during - * metadata update, clear and throw the exception. + * metadata update, clear and return the exception. */ - public synchronized void maybeThrowAuthenticationException() { + public synchronized AuthenticationException getAndClearAuthenticationException() { if (authenticationException != null) { AuthenticationException exception = authenticationException; authenticationException = null; - throw exception; - } + return exception; + } else + return null; } /** @@ -169,7 +170,9 @@ public final class Metadata { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { - maybeThrowAuthenticationException(); + AuthenticationException ex = getAndClearAuthenticationException(); + if (ex != null) + throw ex; if (remainingWaitMs != 0) wait(remainingWaitMs); long elapsed = System.currentTimeMillis() - begin; http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java index 8462979..c4559a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java @@ -47,7 +47,7 @@ public class NetworkClientUtils { * It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails, * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive * connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which - * has recently disconnected. + * has recently disconnected. If authentication to the node fails, an `AuthenticationException` is thrown. * * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with * care. @@ -69,6 +69,8 @@ public class NetworkClientUtils { } long pollTimeout = expiryTime - attemptStartTime; client.poll(pollTimeout, attemptStartTime); + if (client.authenticationException(node) != null) + throw client.authenticationException(node); attemptStartTime = time.milliseconds(); } return client.isReady(node, attemptStartTime); http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index d857ae3..fe92b15 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -850,6 +851,37 @@ public class KafkaAdminClient extends AdminClient { } /** + * If an authentication exception is encountered with connection to any broker, + * fail all pending requests. + */ + private void handleAuthenticationException(long now, Map> callsToSend) { + AuthenticationException authenticationException = metadata.getAndClearAuthenticationException(); + if (authenticationException == null) { + for (Node node : callsToSend.keySet()) { + authenticationException = client.authenticationException(node); + if (authenticationException != null) + break; + } + } + if (authenticationException != null) { + synchronized (this) { + failCalls(now, newCalls, authenticationException); + } + for (List calls : callsToSend.values()) { + failCalls(now, calls, authenticationException); + } + callsToSend.clear(); + } + } + + private void failCalls(long now, List calls, AuthenticationException authenticationException) { + for (Call call : calls) { + call.fail(now, authenticationException); + } + calls.clear(); + } + + /** * Handle responses from the server. * * @param now The current time in milliseconds. @@ -976,6 +1008,7 @@ public class KafkaAdminClient extends AdminClient { // Update the current time and handle the latest responses. now = time.milliseconds(); + handleAuthenticationException(now, callsToSend); handleResponses(now, responses, callsInFlight, correlationIdToCalls); } int numTimedOut = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 43c6358..86fca9e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -135,7 +135,9 @@ public class ConsumerNetworkClient implements Closeable { int version = this.metadata.requestUpdate(); do { poll(timeout); - this.metadata.maybeThrowAuthenticationException(); + AuthenticationException ex = this.metadata.getAndClearAuthenticationException(); + if (ex != null) + throw ex; } while (this.metadata.version() == version && time.milliseconds() - startMs < timeout); return this.metadata.version() > version; } http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 02b79c5..b96ea92 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; @@ -201,32 +202,38 @@ public class Sender implements Runnable { */ void run(long now) { if (transactionManager != null) { - if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) - // Check if the previous run expired batches which requires a reset of the producer state. - transactionManager.resetProducerId(); - - if (!transactionManager.isTransactional()) { - // this is an idempotent producer, so make sure we have a producer id - maybeWaitForProducerId(); - } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { - transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + - "some previously sent messages and can no longer retry them. It isn't safe to continue.")); - } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { - // as long as there are outstanding transactional requests, we simply wait for them to return - client.poll(retryBackoffMs, now); - return; - } + try { + if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) + // Check if the previous run expired batches which requires a reset of the producer state. + transactionManager.resetProducerId(); + + if (!transactionManager.isTransactional()) { + // this is an idempotent producer, so make sure we have a producer id + maybeWaitForProducerId(); + } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { + transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); + } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { + // as long as there are outstanding transactional requests, we simply wait for them to return + client.poll(retryBackoffMs, now); + return; + } - // do not continue sending if the transaction manager is in a failed state or if there - // is no producer id (for the idempotent case). - if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { - RuntimeException lastError = transactionManager.lastError(); - if (lastError != null) - maybeAbortBatches(lastError); - client.poll(retryBackoffMs, now); - return; - } else if (transactionManager.hasAbortableError()) { - accumulator.abortUndrainedBatches(transactionManager.lastError()); + // do not continue sending if the transaction manager is in a failed state or if there + // is no producer id (for the idempotent case). + if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { + RuntimeException lastError = transactionManager.lastError(); + if (lastError != null) + maybeAbortBatches(lastError); + client.poll(retryBackoffMs, now); + return; + } else if (transactionManager.hasAbortableError()) { + accumulator.abortUndrainedBatches(transactionManager.lastError()); + } + } catch (AuthenticationException e) { + // This is already logged as error, but propagated here to perform any clean ups. + log.trace("Authentication exception while processing transactional request: {}", e); + transactionManager.authenticationFailed(e); } } @@ -407,7 +414,7 @@ public class Sender implements Runnable { private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException { Node node = client.leastLoadedNode(time.milliseconds()); - if (NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs)) { + if (node != null && NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs)) { return node; } return null; http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 43abdbb..3bddded 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.protocol.Errors; @@ -615,6 +616,11 @@ public class TransactionManager { enqueueRequest(request); } + synchronized void authenticationFailed(AuthenticationException e) { + for (TxnRequestHandler request : pendingRequests) + request.fatalError(e); + } + Node coordinator(FindCoordinatorRequest.CoordinatorType type) { switch (type) { case GROUP: http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java index aa4a111..c56ac88 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java @@ -16,6 +16,19 @@ */ package org.apache.kafka.common.errors; +/** + * This exception indicates that SASL authentication has failed. + * On authentication failure, clients abort the operation requested and raise one + * of the subclasses of this exception: + *
    + * {@link SaslAuthenticationException} if SASL handshake fails with invalid credentials + * or any other failure specific to the SASL mechanism used for authentication + *
  • {@link UnsupportedSaslMechanismException} if the SASL mechanism requested by the client + * is not supported on the broker.
  • + *
  • {@link IllegalSaslStateException} if an unexpected request is received on during SASL + * handshake. This could be due to misconfigured security protocol.
  • + *
+ */ public class AuthenticationException extends ApiException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java deleted file mode 100644 index 3be72f0..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.errors; - -public class AuthenticationFailedException extends AuthenticationException { - - private static final long serialVersionUID = 1L; - - public AuthenticationFailedException(String message) { - super(message); - } - - public AuthenticationFailedException(String message, Throwable cause) { - super(message, cause); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java index c45f007..691244a 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java @@ -16,6 +16,11 @@ */ package org.apache.kafka.common.errors; +/** + * This exception indicates unexpected requests prior to SASL authentication. + * This could be due to misconfigured security, e.g. if PLAINTEXT protocol + * is used to connect to a SASL endpoint. + */ public class IllegalSaslStateException extends AuthenticationException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java new file mode 100644 index 0000000..d128c25 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This exception indicates that SASL authentication has failed. The error message + * in the exception indicates the actual cause of failure. + *

+ * SASL authentication failures typically indicate invalid credentials, but + * could also include other failures specific to the SASL mechanism used + * for authentication. + *

+ */ +public class SaslAuthenticationException extends AuthenticationException { + + private static final long serialVersionUID = 1L; + + public SaslAuthenticationException(String message) { + super(message); + } + + public SaslAuthenticationException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java index 9dab22a..4db4aee 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.errors; +/** + * This exception indicates that the SASL mechanism requested by the client + * is not enabled on the broker. + */ public class UnsupportedSaslMechanismException extends AuthenticationException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 68f9ed6..24cd9cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -79,7 +79,7 @@ public class KafkaChannel { authenticator.authenticate(); } catch (AuthenticationException e) { switch (authenticator.error()) { - case AUTHENTICATION_FAILED: + case SASL_AUTHENTICATION_FAILED: case ILLEGAL_SASL_STATE: case UNSUPPORTED_SASL_MECHANISM: state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e); http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 160ca26..bea6050 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.errors.ApiException; -import org.apache.kafka.common.errors.AuthenticationFailedException; +import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; @@ -520,11 +520,11 @@ public enum Errors { return new LogDirNotFoundException(message); } }), - AUTHENTICATION_FAILED(58, "Authentication failed.", + SASL_AUTHENTICATION_FAILED(58, "SASL Authentication failed.", new ApiExceptionBuilder() { @Override public ApiException build(String message) { - return new AuthenticationFailedException(message); + return new SaslAuthenticationException(message); } }), UNKNOWN_PRODUCER_ID(59, "This exception is raised by the broker if it could not locate the producer metadata " + http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java index 1dd0e76..c950cb9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java @@ -51,7 +51,7 @@ public class SaslAuthenticateResponse extends AbstractResponse { /** * Possible error codes: - * AUTHENTICATION_FAILED(57) : Authentication failed + * SASL_AUTHENTICATION_FAILED(57) : Authentication failed */ private final Errors error; private final String errorMessage; http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 7cbb756..8207a5a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -104,7 +104,8 @@ public class SaslClientAuthenticator implements Authenticator { private RequestHeader currentRequestHeader; // Version of SaslAuthenticate request/responses private short saslAuthenticateVersion; - // Sasl authentication error which may be one of NONE, UNSUPPORTED_SASL_MECHANISM, ILLEGAL_SASL_STATE, AUTHENTICATION_FAILED or NETWORK_EXCEPTION + // Sasl authentication error which may be one of NONE, UNSUPPORTED_SASL_MECHANISM, ILLEGAL_SASL_STATE, + // SASL_AUTHENTICATION_FAILED or NETWORK_EXCEPTION private Errors error; public SaslClientAuthenticator(Map configs, http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 46386cf..6202131 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -385,7 +385,7 @@ public class SaslServerAuthenticator implements Authenticator { ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken); sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf)); } catch (SaslException e) { - this.error = Errors.AUTHENTICATION_FAILED; + this.error = Errors.SASL_AUTHENTICATION_FAILED; sendKafkaResponse(requestContext, new SaslAuthenticateResponse(this.error, "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism)); throw e; http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index b4460f7..d878b72 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.common.security.authenticator; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -24,17 +26,20 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; -import org.apache.kafka.common.errors.AuthenticationFailedException; +import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.NetworkTestUtils; import org.apache.kafka.common.network.NioEchoServer; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -73,15 +78,15 @@ public class ClientAuthenticationFailureTest { @Test public void testConsumerWithInvalidCredentials() { - saslClientConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); - saslClientConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - saslClientConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + Map props = new HashMap<>(saslClientConfigs); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + StringDeserializer deserializer = new StringDeserializer(); - try (KafkaConsumer consumer = new KafkaConsumer<>(saslClientConfigs)) { + try (KafkaConsumer consumer = new KafkaConsumer<>(props, deserializer, deserializer)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(100); fail("Expected an authentication error!"); - } catch (AuthenticationFailedException e) { + } catch (SaslAuthenticationException e) { // OK } catch (Exception e) { fail("Expected only an authentication error, but another error occurred: " + e.getMessage()); @@ -90,16 +95,47 @@ public class ClientAuthenticationFailureTest { @Test public void testProducerWithInvalidCredentials() { - saslClientConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); - saslClientConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - saslClientConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + Map props = new HashMap<>(saslClientConfigs); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + StringSerializer serializer = new StringSerializer(); - ProducerRecord record = new ProducerRecord<>(topic, "message"); - try (KafkaProducer producer = new KafkaProducer<>(saslClientConfigs)) { + try (KafkaProducer producer = new KafkaProducer<>(props, serializer, serializer)) { + ProducerRecord record = new ProducerRecord<>(topic, "message"); producer.send(record).get(); fail("Expected an authentication error!"); } catch (Exception e) { - assertTrue("Expected an exception of type AuthenticationFailedException", e.getCause() instanceof AuthenticationFailedException); + assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(), + e.getCause() instanceof SaslAuthenticationException); + } + } + + @Test + public void testAdminClientWithInvalidCredentials() { + Map props = new HashMap<>(saslClientConfigs); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + try (AdminClient client = AdminClient.create(props)) { + DescribeTopicsResult result = client.describeTopics(Collections.singleton("test")); + result.all().get(); + fail("Expected an authentication error!"); + } catch (Exception e) { + assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(), + e.getCause() instanceof SaslAuthenticationException); + } + } + + @Test + public void testTransactionalProducerWithInvalidCredentials() throws Exception { + Map props = new HashMap<>(saslClientConfigs); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1"); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + StringSerializer serializer = new StringSerializer(); + + try (KafkaProducer producer = new KafkaProducer<>(props, serializer, serializer)) { + producer.initTransactions(); + fail("Expected an authentication error!"); + } catch (SaslAuthenticationException e) { + // expected exception } } http://git-wip-us.apache.org/repos/asf/kafka/blob/96ba21e0/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index 52fbdba..8765040 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -13,13 +13,15 @@ package kafka.api import java.io.FileOutputStream +import java.util.Collections import java.util.concurrent.{ExecutionException, Future, TimeUnit} -import scala.collection.JavaConverters.seqAsJavaListConverter +import scala.collection.JavaConverters._ +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.AuthenticationFailedException +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.errors.SaslAuthenticationException import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.junit.{After, Before, Test} @@ -39,9 +41,13 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with val producerCount = 1 val serverCount = 1 + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") + this.serverConfig.setProperty(KafkaConfig.TransactionsTopicMinISRProp, "1") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val topic = "topic" + val numPartitions = 1 val tp = new TopicPartition(topic, 0) override def configureSecurityBeforeServersStart() { @@ -56,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, JaasTestUtils.KafkaServerContextName)) super.setUp() - TestUtils.createTopic(this.zkUtils, topic, 1, serverCount, this.servers) + TestUtils.createTopic(this.zkUtils, topic, numPartitions, serverCount, this.servers) } @After @@ -67,10 +73,25 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with @Test def testProducerWithAuthenticationFailure() { - verifyAuthenticationException(() => sendOneRecord(10000)) + verifyAuthenticationException(sendOneRecord(10000)) + verifyAuthenticationException(producers.head.partitionsFor(topic)) createClientCredential() - verifyWithRetry(() => sendOneRecord()) + verifyWithRetry(sendOneRecord()) + } + + @Test + def testTransactionalProducerWithAuthenticationFailure() { + val txProducer = createTransactionalProducer() + verifyAuthenticationException(txProducer.initTransactions()) + + createClientCredential() + try { + txProducer.initTransactions() + fail("Transaction initialization should fail after authentication failure") + } catch { + case _: KafkaException => // expected exception + } } @Test @@ -99,11 +120,40 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with } private def verifyConsumerWithAuthenticationFailure(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { - verifyAuthenticationException(() => consumer.poll(10000)) + verifyAuthenticationException(consumer.poll(10000)) + verifyAuthenticationException(consumer.partitionsFor(topic)) createClientCredential() - verifyWithRetry(() => sendOneRecord()) - verifyWithRetry(() => assertEquals(1, consumer.poll(1000).count)) + verifyWithRetry(sendOneRecord()) + verifyWithRetry(assertEquals(1, consumer.poll(1000).count)) + } + + @Test + def testKafkaAdminClientWithAuthenticationFailure() { + val props = TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val adminClient = AdminClient.create(props) + + def describeTopic(): Unit = { + try { + val response = adminClient.describeTopics(Collections.singleton(topic)).all.get + assertEquals(1, response.size) + response.asScala.foreach { case (topic, description) => + assertEquals(numPartitions, description.partitions.size) + } + } catch { + case e: ExecutionException => throw e.getCause + } + } + + try { + verifyAuthenticationException(describeTopic()) + + createClientCredential() + verifyWithRetry(describeTopic()) + } finally { + adminClient.close + } } @Test @@ -124,9 +174,9 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with val consumer = consumers.head consumer.subscribe(List(topic).asJava) - verifyAuthenticationException(() => consumerGroupService.listGroups) + verifyAuthenticationException(consumerGroupService.listGroups) createClientCredential() - verifyWithRetry(() => consumer.poll(1000)) + verifyWithRetry(consumer.poll(1000)) assertEquals(1, consumerGroupService.listGroups.size) } @@ -147,13 +197,13 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with } } - private def verifyAuthenticationException(f: () => Unit): Unit = { + private def verifyAuthenticationException(action: => Unit): Unit = { val startMs = System.currentTimeMillis try { - f() + action fail("Expected an authentication exception") } catch { - case e: AuthenticationFailedException => + case e: SaslAuthenticationException => // expected exception val elapsedMs = System.currentTimeMillis - startMs assertTrue(s"Poll took too long, elapsed=$elapsedMs", elapsedMs <= 5000) @@ -161,16 +211,29 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with } } - private def verifyWithRetry(f: () => Unit): Unit = { + private def verifyWithRetry(action: => Unit): Unit = { var attempts = 0 TestUtils.waitUntilTrue(() => { try { attempts += 1 - f() + action true } catch { - case _: AuthenticationFailedException => false + case _: SaslAuthenticationException => false } }, s"Operation did not succeed within timeout after $attempts") } + + private def createTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { + producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1") + producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + val txProducer = TestUtils.createNewProducer(brokerList, + securityProtocol = this.securityProtocol, + saslProperties = this.clientSaslProperties, + retries = 1000, + acks = -1, + props = Some(producerConfig)) + producers += txProducer + txProducer + } }