kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [2/2] kafka git commit: KAFKA-4764; Wrap SASL tokens in Kafka headers to improve diagnostics (KIP-152)
Date Fri, 15 Sep 2017 16:16:52 GMT
KAFKA-4764; Wrap SASL tokens in Kafka headers to improve diagnostics (KIP-152)

SASL handshake protocol changes from KIP-152.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #3708 from rajinisivaram/KAFKA-4764-SASL-diagnostics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8fca4322
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8fca4322
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8fca4322

Branch: refs/heads/trunk
Commit: 8fca432223da521b78e60e0cf8fa881ced19589c
Parents: 985cc53
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Fri Sep 15 17:16:29 2017 +0100
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Fri Sep 15 17:16:29 2017 +0100

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   8 +-
 .../org/apache/kafka/clients/NetworkClient.java |   6 +-
 .../errors/AuthenticationFailedException.java   |  31 ++
 .../kafka/common/network/Authenticator.java     |   7 +
 .../kafka/common/network/ChannelState.java      |  56 ++-
 .../kafka/common/network/KafkaChannel.java      |  20 +-
 .../common/network/PlaintextChannelBuilder.java |   7 +
 .../common/network/SaslChannelBuilder.java      |  26 +-
 .../kafka/common/network/SslChannelBuilder.java |   7 +
 .../apache/kafka/common/protocol/ApiKeys.java   |   3 +-
 .../apache/kafka/common/protocol/Errors.java    |  10 +-
 .../apache/kafka/common/protocol/Protocol.java  |  22 +-
 .../kafka/common/requests/AbstractRequest.java  |   2 +
 .../kafka/common/requests/AbstractResponse.java |   2 +
 .../requests/SaslAuthenticateRequest.java       | 102 +++++
 .../requests/SaslAuthenticateResponse.java      |  88 +++++
 .../common/requests/SaslHandshakeRequest.java   |   9 +-
 .../authenticator/SaslClientAuthenticator.java  | 175 +++++++--
 .../authenticator/SaslServerAuthenticator.java  | 133 ++++++-
 .../kafka/common/network/NetworkTestUtils.java  |  11 +-
 .../kafka/common/network/NioEchoServer.java     |   4 +-
 .../common/network/SslTransportLayerTest.java   |  22 +-
 .../kafka/common/protocol/ApiKeysTest.java      |   8 +-
 .../kafka/common/protocol/ProtoUtilsTest.java   |   2 +-
 .../authenticator/SaslAuthenticatorTest.java    | 393 +++++++++++++++++--
 .../security/authenticator/TestJaasConfig.java  |   5 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   8 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |   6 +-
 .../server/SaslApiVersionsRequestTest.scala     |   9 +-
 29 files changed, 1017 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 027d07f..734b07e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -10,7 +10,9 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
+              files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
+    <suppress checks="ClassFanOutComplexity"
+              files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
@@ -43,13 +45,13 @@
     <suppress checks="ClassDataAbstractionCoupling"
               files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
-              files="Errors.java"/>
+              files="(Errors|SaslAuthenticatorTest).java"/>
 
     <suppress checks="BooleanExpressionComplexity"
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer|SaslClientAuthenticator).java"/>
 
     <suppress checks="JavaNCSS"
               files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4a20a3f..f046696 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -586,8 +586,12 @@ public class NetworkClient implements KafkaClient {
         connectionStates.disconnected(nodeId, now);
         apiVersions.remove(nodeId);
         nodesNeedingApiVersionsFetch.remove(nodeId);
-        switch (disconnectState) {
+        switch (disconnectState.state()) {
+            case AUTHENTICATION_FAILED:
+                log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage());
+                break;
             case AUTHENTICATE:
+                // This warning applies to older brokers which dont provide feedback on authentication failures
                 log.warn("Connection to node {} terminated during authentication. This may indicate " +
                         "that authentication failed due to invalid credentials.", nodeId);
                 break;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java
new file mode 100644
index 0000000..3be72f0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class AuthenticationFailedException extends AuthenticationException {
+
+    private static final long serialVersionUID = 1L;
+
+    public AuthenticationFailedException(String message) {
+        super(message);
+    }
+
+    public AuthenticationFailedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index 1fe3beb..fa1123e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 import java.io.Closeable;
@@ -32,6 +33,12 @@ public interface Authenticator extends Closeable {
     void authenticate() throws IOException;
 
     /**
+     * Returns the first error encountered during authentication
+     * @return authentication error if authentication failed, Errors.NONE otherwise
+     */
+    Errors error();
+
+    /**
      * Returns Principal using PrincipalBuilder
      */
     KafkaPrincipal principal();

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
index 23e877c..4370fd8 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.errors.ApiException;
+
 /**
  * States for KafkaChannel:
  * <ul>
@@ -25,14 +27,18 @@ package org.apache.kafka.common.network;
  *       to AUTHENTICATE. Failures in NOT_CONNECTED state typically indicate that the
  *       remote endpoint is unavailable, which may be due to misconfigured endpoints.</li>
  *   <li>AUTHENTICATE: SSL, SASL_SSL and SASL_PLAINTEXT channels are in AUTHENTICATE state during SSL and
- *       SASL handshake. Disconnections in AUTHENTICATE state may indicate that SSL or SASL
- *       authentication failed. Channels transition to READY state when authentication completes
+ *       SASL handshake. Disconnections in AUTHENTICATE state may indicate that authentication failed with
+ *       SSL or SASL (broker version < 1.0.0). Channels transition to READY state when authentication completes
  *       successfully.</li>
  *   <li>READY: Connected, authenticated channels are in READY state. Channels may transition from
  *       READY to EXPIRED, FAILED_SEND or LOCAL_CLOSE.</li>
  *   <li>EXPIRED: Idle connections are moved to EXPIRED state on idle timeout and the channel is closed.</li>
  *   <li>FAILED_SEND: Channels transition from READY to FAILED_SEND state if the channel is closed due
  *       to a send failure.</li>
+ *   <li>AUTHENTICATION_FAILED: Channels are moved to this state if the requested SASL mechanism is not
+ *       enabled in the broker or when brokers with versions 1.0.0 and above provide an error response
+ *       during SASL authentication. {@link #exception()} gives the reason provided by the broker for
+ *       authentication failure.</li>
  *   <li>LOCAL_CLOSE: Channels are moved to LOCAL_CLOSE state if close() is initiated locally.</li>
  * </ul>
  * If the remote endpoint closes a channel, the state of the channel reflects the state the channel
@@ -43,14 +49,44 @@ package org.apache.kafka.common.network;
  *   <li>PLAINTEXT Good path: NOT_CONNECTED => READY => LOCAL_CLOSE</li>
  *   <li>SASL/SSL Good path: NOT_CONNECTED => AUTHENTICATE => READY => LOCAL_CLOSE</li>
  *   <li>Bootstrap server misconfiguration: NOT_CONNECTED, disconnected in NOT_CONNECTED state</li>
- *   <li>Security misconfiguration: NOT_CONNECTED => AUTHENTICATE, disconnected in AUTHENTICATE state</li>
+ *   <li>Security misconfiguration: NOT_CONNECTED => AUTHENTICATE => AUTHENTICATION_FAILED, disconnected in AUTHENTICATION_FAILED state</li>
+ *   <li>Security misconfiguration with older broker: NOT_CONNECTED => AUTHENTICATE, disconnected in AUTHENTICATE state</li>
  * </ul>
  */
-public enum ChannelState {
-    NOT_CONNECTED,
-    AUTHENTICATE,
-    READY,
-    EXPIRED,
-    FAILED_SEND,
-    LOCAL_CLOSE
+public class ChannelState {
+    public enum State {
+        NOT_CONNECTED,
+        AUTHENTICATE,
+        READY,
+        EXPIRED,
+        FAILED_SEND,
+        AUTHENTICATION_FAILED,
+        LOCAL_CLOSE
+    };
+    // AUTHENTICATION_FAILED has a custom exception. For other states,
+    // create a reusable `ChannelState` instance per-state.
+    public static final ChannelState NOT_CONNECTED = new ChannelState(State.NOT_CONNECTED);
+    public static final ChannelState AUTHENTICATE = new ChannelState(State.AUTHENTICATE);
+    public static final ChannelState READY = new ChannelState(State.READY);
+    public static final ChannelState EXPIRED = new ChannelState(State.EXPIRED);
+    public static final ChannelState FAILED_SEND = new ChannelState(State.FAILED_SEND);
+    public static final ChannelState LOCAL_CLOSE = new ChannelState(State.LOCAL_CLOSE);
+
+    private final State state;
+    private final ApiException exception;
+    public ChannelState(State state) {
+        this(state, null);
+    }
+    public ChannelState(State state, ApiException exception) {
+        this.state = state;
+        this.exception = exception;
+    }
+
+    public State state() {
+        return state;
+    }
+
+    public ApiException exception() {
+        return exception;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index c759f00..68f9ed6 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.network;
 
-
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
@@ -74,8 +74,22 @@ public class KafkaChannel {
     public void prepare() throws IOException {
         if (!transportLayer.ready())
             transportLayer.handshake();
-        if (transportLayer.ready() && !authenticator.complete())
-            authenticator.authenticate();
+        if (transportLayer.ready() && !authenticator.complete()) {
+            try {
+                authenticator.authenticate();
+            } catch (AuthenticationException e) {
+                switch (authenticator.error()) {
+                    case AUTHENTICATION_FAILED:
+                    case ILLEGAL_SASL_STATE:
+                    case UNSUPPORTED_SASL_MECHANISM:
+                        state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e);
+                        break;
+                    default:
+                        // Other errors are handled as network exceptions in Selector
+                }
+                throw e;
+            }
+        }
         if (ready())
             state = ChannelState.READY;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index c0d1059..95fd903 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
@@ -79,6 +80,12 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
         }
 
         @Override
+        public Errors error() {
+            // PLAINTEXT never fails authentication
+            return Errors.NONE;
+        }
+
+        @Override
         public void close() {
             if (principalBuilder instanceof Closeable)
                 Utils.closeQuietly((Closeable) principalBuilder, "principal builder");

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 9d7eac0..d00442e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -35,11 +35,14 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.Socket;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.List;
 import java.util.Map;
 
+import javax.security.auth.Subject;
+
 public class SaslChannelBuilder implements ChannelBuilder {
     private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class);
 
@@ -113,15 +116,14 @@ public class SaslChannelBuilder implements ChannelBuilder {
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SocketChannel socketChannel = (SocketChannel) key.channel();
+            Socket socket = socketChannel.socket();
             TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
             Authenticator authenticator;
             if (mode == Mode.SERVER)
-                authenticator = new SaslServerAuthenticator(configs, id, jaasContext, loginManager.subject(),
-                        kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer);
+                authenticator = buildServerAuthenticator(configs, id, transportLayer, loginManager.subject());
             else
-                authenticator = new SaslClientAuthenticator(configs, id, loginManager.subject(), loginManager.serviceName(),
-                        socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism,
-                        handshakeRequestEnable, transportLayer);
+                authenticator = buildClientAuthenticator(configs, id, socket.getInetAddress().getHostName(),
+                        loginManager.serviceName(), transportLayer, loginManager.subject());
             return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
@@ -146,6 +148,20 @@ public class SaslChannelBuilder implements ChannelBuilder {
         }
     }
 
+    // Visible to override for testing
+    protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
+            TransportLayer transportLayer, Subject subject) throws IOException {
+        return new SaslServerAuthenticator(configs, id, jaasContext, subject,
+                kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer);
+    }
+
+    // Visible to override for testing
+    protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs, String id,
+            String serverHost, String servicePrincipal, TransportLayer transportLayer, Subject subject) throws IOException {
+        return new SaslClientAuthenticator(configs, id, subject, servicePrincipal,
+                serverHost, clientSaslMechanism, handshakeRequestEnable, transportLayer);
+    }
+
     // Package private for testing
     LoginManager loginManager() {
         return loginManager;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index b6ef625..80b9e9a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.SslAuthenticationContext;
@@ -158,5 +159,11 @@ public class SslChannelBuilder implements ChannelBuilder {
             return true;
         }
 
+        @Override
+        public Errors error() {
+            // SSL authentication failures are currently not propagated to clients
+            return Errors.NONE;
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 5ac02fa..d37eddf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -70,7 +70,8 @@ public enum ApiKeys {
     DESCRIBE_CONFIGS(32, "DescribeConfigs"),
     ALTER_CONFIGS(33, "AlterConfigs"),
     ALTER_REPLICA_DIR(34, "AlterReplicaDir"),
-    DESCRIBE_LOG_DIRS(35, "DescribeLogDirs");
+    DESCRIBE_LOG_DIRS(35, "DescribeLogDirs"),
+    SASL_AUTHENTICATE(36, "SaslAuthenticate");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bbc3486..1039ca0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.AuthenticationFailedException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ConcurrentTransactionsException;
@@ -516,7 +517,14 @@ public enum Errors {
             public ApiException build(String message) {
                 return new LogDirNotFoundException(message);
             }
-    });
+    }),
+    AUTHENTICATION_FAILED(58, "Authentication failed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new AuthenticationFailedException(message);
+            }
+        });
 
     private interface ApiExceptionBuilder {
         ApiException build(String message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 10b1823..c7431d0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1297,8 +1297,12 @@ public class Protocol {
             new Field("error_code", INT16),
             new Field("enabled_mechanisms", new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server."));
 
-    public static final Schema[] SASL_HANDSHAKE_REQUEST = {SASL_HANDSHAKE_REQUEST_V0};
-    public static final Schema[] SASL_HANDSHAKE_RESPONSE = {SASL_HANDSHAKE_RESPONSE_V0};
+    // SASL_HANDSHAKE_REQUEST_V1 added to support SASL_AUTHENTICATE request to improve diagnostics
+    public static final Schema SASL_HANDSHAKE_REQUEST_V1 = SASL_HANDSHAKE_REQUEST_V0;
+    public static final Schema SASL_HANDSHAKE_RESPONSE_V1 = SASL_HANDSHAKE_RESPONSE_V0;
+
+    public static final Schema[] SASL_HANDSHAKE_REQUEST = {SASL_HANDSHAKE_REQUEST_V0, SASL_HANDSHAKE_REQUEST_V1};
+    public static final Schema[] SASL_HANDSHAKE_RESPONSE = {SASL_HANDSHAKE_RESPONSE_V0, SASL_HANDSHAKE_RESPONSE_V1};
 
     /* ApiVersion api */
     public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
@@ -1880,6 +1884,18 @@ public class Protocol {
     public static final Schema[] DESCRIBE_LOG_DIRS_REQUEST = {DESCRIBE_LOG_DIRS_REQUEST_V0};
     public static final Schema[] DESCRIBE_LOG_DIRS_RESPONSE = {DESCRIBE_LOG_DIRS_RESPONSE_V0};
 
+    /* SASL authentication api */
+    public static final Schema SASL_AUTHENTICATE_REQUEST_V0 = new Schema(
+            new Field("sasl_auth_bytes", BYTES, "SASL authentication bytes from client as defined by the SASL mechanism."));
+
+    public static final Schema SASL_AUTHENTICATE_RESPONSE_V0 = new Schema(
+            new Field("error_code", INT16),
+            new Field("error_message", NULLABLE_STRING),
+            new Field("sasl_auth_bytes", BYTES, "SASL authentication bytes from server as defined by the SASL mechanism."));
+    public static final Schema[] SASL_AUTHENTICATE_REQUEST = {SASL_AUTHENTICATE_REQUEST_V0};
+    public static final Schema[] SASL_AUTHENTICATE_RESPONSE = {SASL_AUTHENTICATE_RESPONSE_V0};
+
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1927,6 +1943,7 @@ public class Protocol {
         REQUESTS[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_REQUEST;
         REQUESTS[ApiKeys.ALTER_REPLICA_DIR.id] = ALTER_REPLICA_DIR_REQUEST;
         REQUESTS[ApiKeys.DESCRIBE_LOG_DIRS.id] = DESCRIBE_LOG_DIRS_REQUEST;
+        REQUESTS[ApiKeys.SASL_AUTHENTICATE.id] = SASL_AUTHENTICATE_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1964,6 +1981,7 @@ public class Protocol {
         RESPONSES[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_RESPONSE;
         RESPONSES[ApiKeys.ALTER_REPLICA_DIR.id] = ALTER_REPLICA_DIR_RESPONSE;
         RESPONSES[ApiKeys.DESCRIBE_LOG_DIRS.id] = DESCRIBE_LOG_DIRS_RESPONSE;
+        RESPONSES[ApiKeys.SASL_AUTHENTICATE.id] = SASL_AUTHENTICATE_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index f819371..34fda50 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -181,6 +181,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new AlterReplicaDirRequest(struct, apiVersion);
             case DESCRIBE_LOG_DIRS:
                 return new DescribeLogDirsRequest(struct, apiVersion);
+            case SASL_AUTHENTICATE:
+                return new SaslAuthenticateRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index f9ff6e8..95d1ef9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -114,6 +114,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new AlterReplicaDirResponse(struct);
             case DESCRIBE_LOG_DIRS:
                 return new DescribeLogDirsResponse(struct);
+            case SASL_AUTHENTICATE:
+                return new SaslAuthenticateResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
new file mode 100644
index 0000000..f21c896
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Request from SASL client containing client SASL authentication token as defined by the
+ * SASL protocol for the configured SASL mechanism.
+ * <p/>
+ * For interoperability with versions prior to Kafka 1.0.0, this request is used only with broker
+ * version 1.0.0 and higher that support SaslHandshake request v1. Clients connecting to older
+ * brokers will send SaslHandshake request v0 followed by SASL tokens without the Kafka request headers.
+ */
+public class SaslAuthenticateRequest extends AbstractRequest {
+
+    private static final String SASL_AUTH_BYTES_KEY_NAME = "sasl_auth_bytes";
+
+    private final ByteBuffer saslAuthBytes;
+
+    public static class Builder extends AbstractRequest.Builder<SaslAuthenticateRequest> {
+        private final ByteBuffer saslAuthBytes;
+
+        public Builder(ByteBuffer saslAuthBytes) {
+            super(ApiKeys.SASL_AUTHENTICATE);
+            this.saslAuthBytes = saslAuthBytes;
+        }
+
+        @Override
+        public SaslAuthenticateRequest build(short version) {
+            return new SaslAuthenticateRequest(saslAuthBytes, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=SaslAuthenticateRequest)");
+            return bld.toString();
+        }
+    }
+
+    public SaslAuthenticateRequest(ByteBuffer saslAuthBytes) {
+        this(saslAuthBytes, ApiKeys.SASL_AUTHENTICATE.latestVersion());
+    }
+
+    public SaslAuthenticateRequest(ByteBuffer saslAuthBytes, short version) {
+        super(version);
+        this.saslAuthBytes = saslAuthBytes;
+    }
+
+    public SaslAuthenticateRequest(Struct struct, short version) {
+        super(version);
+        saslAuthBytes = struct.getBytes(SASL_AUTH_BYTES_KEY_NAME);
+    }
+
+    public ByteBuffer saslAuthBytes() {
+        return saslAuthBytes;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new SaslAuthenticateResponse(Errors.forException(e), e.getMessage());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ApiKeys.SASL_AUTHENTICATE.latestVersion()));
+        }
+    }
+
+    public static SaslAuthenticateRequest parse(ByteBuffer buffer, short version) {
+        return new SaslAuthenticateRequest(ApiKeys.SASL_AUTHENTICATE.parseRequest(version, buffer), version);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.requestSchema(version()));
+        struct.set(SASL_AUTH_BYTES_KEY_NAME, saslAuthBytes);
+        return struct;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
new file mode 100644
index 0000000..2119f21
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Response from SASL server which for a SASL challenge as defined by the SASL protocol
+ * for the mechanism configured for the client.
+ */
+public class SaslAuthenticateResponse extends AbstractResponse {
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
+    private static final String SASL_AUTH_BYTES_KEY_NAME = "sasl_auth_bytes";
+
+    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+    private final ByteBuffer saslAuthBytes;
+
+    /**
+     * Possible error codes:
+     *   AUTHENTICATION_FAILED(57) : Authentication failed
+     */
+    private final Errors error;
+    private final String errorMessage;
+
+    public SaslAuthenticateResponse(Errors error, String errorMessage) {
+        this(error, errorMessage, EMPTY_BUFFER);
+    }
+
+    public SaslAuthenticateResponse(Errors error, String errorMessage, ByteBuffer saslAuthBytes) {
+        this.error = error;
+        this.errorMessage = errorMessage;
+        this.saslAuthBytes = saslAuthBytes;
+    }
+
+    public SaslAuthenticateResponse(Struct struct) {
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
+        saslAuthBytes = struct.getBytes(SASL_AUTH_BYTES_KEY_NAME);
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public String errorMessage() {
+        return errorMessage;
+    }
+
+    public ByteBuffer saslAuthBytes() {
+        return saslAuthBytes;
+    }
+
+    @Override
+    public Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.responseSchema(version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage);
+        struct.set(SASL_AUTH_BYTES_KEY_NAME, saslAuthBytes);
+        return struct;
+    }
+
+    public static SaslAuthenticateResponse parse(ByteBuffer buffer, short version) {
+        return new SaslAuthenticateResponse(ApiKeys.SASL_AUTHENTICATE.parseResponse(version, buffer));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index e49b727..9906d13 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -50,7 +50,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
 
         @Override
         public SaslHandshakeRequest build(short version) {
-            return new SaslHandshakeRequest(mechanism);
+            return new SaslHandshakeRequest(mechanism, version);
         }
 
         @Override
@@ -64,7 +64,11 @@ public class SaslHandshakeRequest extends AbstractRequest {
     }
 
     public SaslHandshakeRequest(String mechanism) {
-        super(ApiKeys.SASL_HANDSHAKE.latestVersion());
+        this(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion());
+    }
+
+    public SaslHandshakeRequest(String mechanism, short version) {
+        super(version);
         this.mechanism = mechanism;
     }
 
@@ -82,6 +86,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 List<String> enabledMechanisms = Collections.emptyList();
                 return new SaslHandshakeResponse(Errors.forException(e), enabledMechanisms);
             default:

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 7b68abe..7cbb756 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -33,11 +33,17 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.SaslAuthenticateRequest;
+import org.apache.kafka.common.requests.SaslAuthenticateResponse;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,10 +65,19 @@ import java.util.Set;
 public class SaslClientAuthenticator implements Authenticator {
 
     public enum SaslState {
-        SEND_HANDSHAKE_REQUEST, RECEIVE_HANDSHAKE_RESPONSE, INITIAL, INTERMEDIATE, COMPLETE, FAILED
+        SEND_APIVERSIONS_REQUEST,     // Initial state: client sends ApiVersionsRequest in this state
+        RECEIVE_APIVERSIONS_RESPONSE, // Awaiting ApiVersionsResponse from server
+        SEND_HANDSHAKE_REQUEST,       // Received ApiVersionsResponse, send SaslHandshake request
+        RECEIVE_HANDSHAKE_RESPONSE,   // Awaiting SaslHandshake request from server
+        INITIAL,                      // Initial state starting SASL token exchange for configured mechanism, send first token
+        INTERMEDIATE,                 // Intermediate state during SASL token exchange, process challenges and send responses
+        CLIENT_COMPLETE,              // Sent response to last challenge. If using SaslAuthenticate, wait for authentication status from server, else COMPLETE
+        COMPLETE,                     // Authentication sequence complete. If using SaslAuthenticate, this state implies successful authentication.
+        FAILED                        // Failed authentication due to an error at some stage
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class);
+    private static final short DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER = -1;
 
     private final Subject subject;
     private final String servicePrincipal;
@@ -87,6 +102,10 @@ public class SaslClientAuthenticator implements Authenticator {
     private int correlationId;
     // Request header for which response from the server is pending
     private RequestHeader currentRequestHeader;
+    // Version of SaslAuthenticate request/responses
+    private short saslAuthenticateVersion;
+    // Sasl authentication error which may be one of NONE, UNSUPPORTED_SASL_MECHANISM, ILLEGAL_SASL_STATE, AUTHENTICATION_FAILED or NETWORK_EXCEPTION
+    private Errors error;
 
     public SaslClientAuthenticator(Map<String, ?> configs,
                                    String node,
@@ -104,9 +123,11 @@ public class SaslClientAuthenticator implements Authenticator {
         this.correlationId = -1;
         this.transportLayer = transportLayer;
         this.configs = configs;
+        this.saslAuthenticateVersion = DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER;
+        this.error = Errors.NONE;
 
         try {
-            setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
+            setSaslState(handshakeRequestEnable ? SaslState.SEND_APIVERSIONS_REQUEST : SaslState.INITIAL);
 
             // determine client principal from subject for Kerberos to use as authorization id for the SaslClient.
             // For other mechanisms, the authenticated principal (username for PLAIN and SCRAM) is used as
@@ -148,57 +169,93 @@ public class SaslClientAuthenticator implements Authenticator {
      * followed by N bytes representing the opaque payload.
      */
     public void authenticate() throws IOException {
+        short saslHandshakeVersion = 0;
         if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
             return;
 
         switch (saslState) {
+            case SEND_APIVERSIONS_REQUEST:
+                // Always use version 0 request since brokers treat requests with schema exceptions as GSSAPI tokens
+                ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0);
+                send(apiVersionsRequest.toSend(node, nextRequestHeader(ApiKeys.API_VERSIONS, apiVersionsRequest.version())));
+                setSaslState(SaslState.RECEIVE_APIVERSIONS_RESPONSE);
+                break;
+            case RECEIVE_APIVERSIONS_RESPONSE:
+                ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) receiveKafkaResponse();
+                if (apiVersionsResponse == null)
+                    break;
+                else {
+                    saslHandshakeVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion;
+                    ApiVersion authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id);
+                    if (authenticateVersion != null)
+                        saslAuthenticateVersion((short) Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion()));
+                    setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
+                    // Fall through to send send handshake request with the latest supported version
+                }
             case SEND_HANDSHAKE_REQUEST:
-                // When multiple versions of SASL_HANDSHAKE_REQUEST are to be supported,
-                // API_VERSIONS_REQUEST must be sent prior to sending SASL_HANDSHAKE_REQUEST to
-                // fetch supported versions.
-                String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
-                SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);
-                currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE,
-                        handshakeRequest.version(), clientId, correlationId++);
-                send(handshakeRequest.toSend(node, currentRequestHeader));
+                SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion);
+                send(handshakeRequest.toSend(node, nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version())));
                 setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
                 break;
             case RECEIVE_HANDSHAKE_RESPONSE:
-                byte[] responseBytes = receiveResponseOrToken();
-                if (responseBytes == null)
+                SaslHandshakeResponse handshakeResponse = (SaslHandshakeResponse) receiveKafkaResponse();
+                if (handshakeResponse == null)
                     break;
                 else {
-                    try {
-                        handleKafkaResponse(currentRequestHeader, responseBytes);
-                        currentRequestHeader = null;
-                    } catch (Exception e) {
-                        setSaslState(SaslState.FAILED);
-                        throw e;
-                    }
+                    handleSaslHandshakeResponse(handshakeResponse);
                     setSaslState(SaslState.INITIAL);
                     // Fall through and start SASL authentication using the configured client mechanism
                 }
             case INITIAL:
-                sendSaslToken(new byte[0], true);
+                sendSaslClientToken(new byte[0], true);
                 setSaslState(SaslState.INTERMEDIATE);
                 break;
             case INTERMEDIATE:
-                byte[] serverToken = receiveResponseOrToken();
-                if (serverToken != null) {
-                    sendSaslToken(serverToken, false);
-                }
+                byte[] serverToken = receiveToken();
+                boolean noResponsesPending = serverToken != null && !sendSaslClientToken(serverToken, false);
+                // For versions without SASL_AUTHENTICATE header, SASL exchange may be complete after a token is sent to server.
+                // For versions with SASL_AUTHENTICATE header, server always sends a response to each SASL_AUTHENTICATE request.
                 if (saslClient.isComplete()) {
-                    setSaslState(SaslState.COMPLETE);
-                    transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+                    if (saslAuthenticateVersion == DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER || noResponsesPending)
+                        setSaslState(SaslState.COMPLETE);
+                    else
+                        setSaslState(SaslState.CLIENT_COMPLETE);
                 }
                 break;
+            case CLIENT_COMPLETE:
+                byte[] serverResponse = receiveToken();
+                if (serverResponse != null)
+                    setSaslState(SaslState.COMPLETE);
+                break;
             case COMPLETE:
                 break;
             case FAILED:
-                throw new IOException("SASL handshake failed");
+                // Should never get here since exception would have been propagated earlier
+                throw new IllegalStateException("SASL handshake has already failed");
         }
     }
 
+    @Override
+    public Errors error() {
+        return error;
+    }
+
+    private RequestHeader nextRequestHeader(ApiKeys apiKey, short version) {
+        String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
+        currentRequestHeader = new RequestHeader(apiKey, version, clientId, correlationId++);
+        return currentRequestHeader;
+    }
+
+    // Visible to override for testing
+    protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
+        return new SaslHandshakeRequest.Builder(mechanism).build(version);
+    }
+
+    // Visible to override for testing
+    protected void saslAuthenticateVersion(short version) {
+        this.saslAuthenticateVersion = version;
+    }
+
     private void setSaslState(SaslState saslState) {
         if (netOutBuffer != null && !netOutBuffer.completed())
             pendingSaslState = saslState;
@@ -206,15 +263,30 @@ public class SaslClientAuthenticator implements Authenticator {
             this.pendingSaslState = null;
             this.saslState = saslState;
             LOG.debug("Set SASL client state to {}", saslState);
+            if (saslState == SaslState.COMPLETE)
+                transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
         }
     }
 
-    private void sendSaslToken(byte[] serverToken, boolean isInitial) throws IOException {
+    /**
+     * Sends a SASL client token to server if required. This may be an initial token to start
+     * SASL token exchange or response to a challenge from the server.
+     * @return true if a token was sent to the server
+     */
+    private boolean sendSaslClientToken(byte[] serverToken, boolean isInitial) throws IOException {
         if (!saslClient.isComplete()) {
             byte[] saslToken = createSaslToken(serverToken, isInitial);
-            if (saslToken != null)
-                send(new NetworkSend(node, ByteBuffer.wrap(saslToken)));
+            if (saslToken != null) {
+                ByteBuffer tokenBuf = ByteBuffer.wrap(saslToken);
+                if (saslAuthenticateVersion != DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER) {
+                    SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(tokenBuf).build(saslAuthenticateVersion);
+                    tokenBuf = request.serialize(nextRequestHeader(ApiKeys.SASL_AUTHENTICATE, saslAuthenticateVersion));
+                }
+                send(new NetworkSend(node, tokenBuf));
+                return true;
+            }
         }
+        return false;
     }
 
     private void send(Send send) throws IOException {
@@ -266,6 +338,25 @@ public class SaslClientAuthenticator implements Authenticator {
             callbackHandler.close();
     }
 
+    private byte[] receiveToken() throws IOException {
+        if (saslAuthenticateVersion == DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER) {
+            return receiveResponseOrToken();
+        } else {
+            SaslAuthenticateResponse response = (SaslAuthenticateResponse) receiveKafkaResponse();
+            if (response != null) {
+                this.error = response.error();
+                if (this.error != Errors.NONE) {
+                    setSaslState(SaslState.FAILED);
+                    String errMsg = response.errorMessage();
+                    throw errMsg == null ? error.exception() : error.exception(errMsg);
+                }
+                return Utils.readBytes(response.saslAuthBytes());
+            } else
+                return null;
+        }
+    }
+
+
     private byte[] createSaslToken(final byte[] saslToken, boolean isInitial) throws SaslException {
         if (saslToken == null)
             throw new SaslException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
@@ -305,25 +396,27 @@ public class SaslClientAuthenticator implements Authenticator {
         return netOutBuffer.completed();
     }
 
-    private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) {
-        AbstractResponse response;
+    private AbstractResponse receiveKafkaResponse() throws IOException {
         try {
-            response = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
+            byte[] responseBytes = receiveResponseOrToken();
+            if (responseBytes == null)
+                return null;
+            else {
+                AbstractResponse response = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), currentRequestHeader);
+                currentRequestHeader = null;
+                return response;
+            }
         } catch (SchemaException | IllegalArgumentException e) {
             LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
+            setSaslState(SaslState.FAILED);
             throw new AuthenticationException("Invalid SASL mechanism response", e);
         }
-        switch (requestHeader.apiKey()) {
-            case SASL_HANDSHAKE:
-                handleSaslHandshakeResponse((SaslHandshakeResponse) response);
-                break;
-            default:
-                throw new IllegalStateException("Unexpected API key during handshake: " + requestHeader.apiKey());
-        }
     }
 
     private void handleSaslHandshakeResponse(SaslHandshakeResponse response) {
-        Errors error = response.error();
+        this.error = response.error();
+        if (error != Errors.NONE)
+            setSaslState(SaslState.FAILED);
         switch (error) {
             case NONE:
                 break;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 28aa995..ce6a6b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.ListenerName;
@@ -34,12 +35,15 @@ import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.RequestAndSize;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.SaslAuthenticateRequest;
+import org.apache.kafka.common.requests.SaslAuthenticateResponse;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasContext;
@@ -83,9 +87,15 @@ public class SaslServerAuthenticator implements Authenticator {
     // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
     static final int MAX_RECEIVE_SIZE = 524288;
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
+    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
 
     private enum SaslState {
-        GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
+        INITIAL_REQUEST,               // May be GSSAPI token, SaslHandshake or ApiVersions
+        HANDSHAKE_OR_VERSIONS_REQUEST, // May be SaslHandshake or ApiVersions
+        HANDSHAKE_REQUEST,             // After an ApiVersions request, next request must be SaslHandshake
+        AUTHENTICATE,                  // Authentication tokens (SaslHandshake v1 and above indicate SaslAuthenticate headers)
+        COMPLETE,                      // Authentication completed successfully
+        FAILED                         // Authentication failed
     }
 
     private final SecurityProtocol securityProtocol;
@@ -100,9 +110,11 @@ public class SaslServerAuthenticator implements Authenticator {
     private final KafkaPrincipalBuilder principalBuilder;
 
     // Current SASL state
-    private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST;
+    private SaslState saslState = SaslState.INITIAL_REQUEST;
     // Next SASL state to be set when outgoing writes associated with the current SASL state complete
     private SaslState pendingSaslState = null;
+    // Exception that will be thrown by `authenticate()` when SaslState is set to FAILED after outbound writes complete
+    private IOException pendingException = null;
     private SaslServer saslServer;
     private String saslMechanism;
     private AuthCallbackHandler callbackHandler;
@@ -110,6 +122,10 @@ public class SaslServerAuthenticator implements Authenticator {
     // buffers used in `authenticate`
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
+    // flag indicating if sasl tokens are sent as Kafka SaslAuthenticate request/responses
+    private boolean enableKafkaSaslAuthenticateHeaders;
+    // authentication error if authentication failed
+    private Errors error;
 
     public SaslServerAuthenticator(Map<String, ?> configs,
                                    String connectionId,
@@ -128,6 +144,9 @@ public class SaslServerAuthenticator implements Authenticator {
         this.credentialCache = credentialCache;
         this.listenerName = listenerName;
         this.securityProtocol = securityProtocol;
+        this.enableKafkaSaslAuthenticateHeaders = false;
+        this.error = Errors.NONE;
+
         this.transportLayer = transportLayer;
 
         this.configs = configs;
@@ -238,20 +257,17 @@ public class SaslServerAuthenticator implements Authenticator {
             netInBuffer = null; // reset the networkReceive as we read all the data.
             try {
                 switch (saslState) {
+                    case HANDSHAKE_OR_VERSIONS_REQUEST:
                     case HANDSHAKE_REQUEST:
                         handleKafkaRequest(clientToken);
                         break;
-                    case GSSAPI_OR_HANDSHAKE_REQUEST:
+                    case INITIAL_REQUEST:
                         if (handleKafkaRequest(clientToken))
                             break;
                         // For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
                         // This is required for interoperability with 0.9.0.x clients which do not send handshake request
                     case AUTHENTICATE:
-                        byte[] response = saslServer.evaluateResponse(clientToken);
-                        if (response != null) {
-                            netOutBuffer = new NetworkSend(connectionId, ByteBuffer.wrap(response));
-                            flushNetOutBufferAndUpdateInterestOps();
-                        }
+                        handleSaslToken(clientToken);
                         // When the authentication exchange is complete and no more tokens are expected from the client,
                         // update SASL state. Current SASL state will be updated when outgoing writes to the client complete.
                         if (saslServer.isComplete())
@@ -261,8 +277,7 @@ public class SaslServerAuthenticator implements Authenticator {
                         break;
                 }
             } catch (Exception e) {
-                setSaslState(SaslState.FAILED);
-                throw new IOException(e);
+                setSaslState(SaslState.FAILED, new IOException(e));
             }
         }
     }
@@ -274,6 +289,11 @@ public class SaslServerAuthenticator implements Authenticator {
     }
 
     @Override
+    public Errors error() {
+        return error;
+    }
+
+    @Override
     public boolean complete() {
         return saslState == SaslState.COMPLETE;
     }
@@ -288,13 +308,21 @@ public class SaslServerAuthenticator implements Authenticator {
             callbackHandler.close();
     }
 
-    private void setSaslState(SaslState saslState) {
-        if (netOutBuffer != null && !netOutBuffer.completed())
+    private void setSaslState(SaslState saslState) throws IOException {
+        setSaslState(saslState, null);
+    }
+
+    private void setSaslState(SaslState saslState, IOException exception) throws IOException {
+        if (netOutBuffer != null && !netOutBuffer.completed()) {
             pendingSaslState = saslState;
-        else {
-            this.pendingSaslState = null;
+            pendingException = exception;
+        } else {
             this.saslState = saslState;
             LOG.debug("Set SASL server state to {}", saslState);
+            this.pendingSaslState = null;
+            this.pendingException = null;
+            if (exception != null)
+                throw exception;
         }
     }
 
@@ -303,7 +331,7 @@ public class SaslServerAuthenticator implements Authenticator {
         if (flushedCompletely) {
             transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
             if (pendingSaslState != null)
-                setSaslState(pendingSaslState);
+                setSaslState(pendingSaslState, pendingException);
         } else
             transportLayer.addInterestOps(SelectionKey.OP_WRITE);
         return flushedCompletely;
@@ -323,6 +351,49 @@ public class SaslServerAuthenticator implements Authenticator {
         return transportLayer.socketChannel().socket().getInetAddress();
     }
 
+    private void handleSaslToken(byte[] clientToken) throws IOException {
+        if (!enableKafkaSaslAuthenticateHeaders) {
+            byte[] response = saslServer.evaluateResponse(clientToken);
+            if (response != null) {
+                netOutBuffer = new NetworkSend(connectionId, ByteBuffer.wrap(response));
+                flushNetOutBufferAndUpdateInterestOps();
+            }
+        } else {
+            ByteBuffer requestBuffer = ByteBuffer.wrap(clientToken);
+            RequestHeader header = RequestHeader.parse(requestBuffer);
+            ApiKeys apiKey = header.apiKey();
+            short version = header.apiVersion();
+            RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(),
+                    KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol);
+            RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
+            if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
+                this.error = Errors.ILLEGAL_SASL_STATE;
+                IllegalSaslStateException e = new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL authentication.");
+                sendKafkaResponse(requestContext, requestAndSize.request.getErrorResponse(e));
+                throw e;
+            }
+            if (!Protocol.apiVersionSupported(apiKey.id, version)) {
+                this.error = Errors.UNSUPPORTED_VERSION;
+                // We cannot create an error response if the request version of SaslAuthenticate is not supported
+                // This should not normally occur since clients typically check supported versions using ApiVersionsRequest
+                throw new UnsupportedVersionException("Version " + version + " is not supported for apiKey " + apiKey);
+            }
+            SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest) requestAndSize.request;
+
+            try {
+                byte[] responseToken = saslServer.evaluateResponse(Utils.readBytes(saslAuthenticateRequest.saslAuthBytes()));
+                // For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE request even if token is empty.
+                ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
+                sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
+            } catch (SaslException e) {
+                this.error = Errors.AUTHENTICATION_FAILED;
+                sendKafkaResponse(requestContext, new SaslAuthenticateResponse(this.error,
+                        "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism));
+                throw e;
+            }
+        }
+    }
+
     private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
         boolean isKafkaRequest = false;
         String clientMechanism = null;
@@ -333,7 +404,8 @@ public class SaslServerAuthenticator implements Authenticator {
 
             // A valid Kafka request header was received. SASL authentication tokens are now expected only
             // following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
-            setSaslState(SaslState.HANDSHAKE_REQUEST);
+            if (saslState == SaslState.INITIAL_REQUEST)
+                setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
             isKafkaRequest = true;
 
             // Raise an error prior to parsing if the api cannot be handled at this layer. This avoids
@@ -352,7 +424,7 @@ public class SaslServerAuthenticator implements Authenticator {
             else
                 clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
         } catch (InvalidRequestException e) {
-            if (saslState == SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) {
+            if (saslState == SaslState.INITIAL_REQUEST) {
                 // InvalidRequestException is thrown if the request is not in Kafka format or if the API key
                 // is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
                 // starting with 0x60, revert to GSSAPI for both these exceptions.
@@ -382,22 +454,41 @@ public class SaslServerAuthenticator implements Authenticator {
 
     private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
         String clientMechanism = handshakeRequest.mechanism();
+        short version = context.header.apiVersion();
+        if (version >= 1)
+            this.enableKafkaSaslAuthenticateHeaders(true);
         if (enabledMechanisms.contains(clientMechanism)) {
             LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism);
             sendKafkaResponse(context, new SaslHandshakeResponse(Errors.NONE, enabledMechanisms));
             return clientMechanism;
         } else {
             LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism);
-            sendKafkaResponse(context, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
+            this.error = Errors.UNSUPPORTED_SASL_MECHANISM;
+            sendKafkaResponse(context, new SaslHandshakeResponse(this.error, enabledMechanisms));
             throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism);
         }
     }
 
-    private void handleApiVersionsRequest(RequestContext context, ApiVersionsRequest apiVersionsRequest) throws IOException, UnsupportedSaslMechanismException {
+    // Visible to override for testing
+    protected ApiVersionsResponse apiVersionsResponse() {
+        return ApiVersionsResponse.API_VERSIONS_RESPONSE;
+    }
+
+    // Visible to override for testing
+    protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
+        this.enableKafkaSaslAuthenticateHeaders = flag;
+    }
+
+    private void handleApiVersionsRequest(RequestContext context, ApiVersionsRequest apiVersionsRequest) throws IOException {
+        if (saslState != SaslState.HANDSHAKE_OR_VERSIONS_REQUEST)
+            throw new IllegalStateException("Unexpected ApiVersions request received during SASL authentication state " + saslState);
+
         if (apiVersionsRequest.hasUnsupportedRequestVersion())
             sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception()));
-        else
-            sendKafkaResponse(context, ApiVersionsResponse.API_VERSIONS_RESPONSE);
+        else {
+            sendKafkaResponse(context, apiVersionsResponse());
+            setSaslState(SaslState.HANDSHAKE_REQUEST);
+        }
     }
 
     private void sendKafkaResponse(RequestContext context, AbstractResponse response) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index d0aa4c5..a4ce66c 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
@@ -36,8 +37,8 @@ import org.apache.kafka.test.TestUtils;
 public class NetworkTestUtils {
 
     public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol,
-                                                 AbstractConfig serverConfig) throws Exception {
-        NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost", null);
+            AbstractConfig serverConfig, CredentialCache credentialCache) throws Exception {
+        NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost", null, credentialCache);
         server.start();
         return server;
     }
@@ -78,16 +79,16 @@ public class NetworkTestUtils {
         assertTrue(selector.isChannelReady(node));
     }
 
-    public static void waitForChannelClose(Selector selector, String node, ChannelState channelState) throws IOException {
+    public static void waitForChannelClose(Selector selector, String node, ChannelState.State channelState) throws IOException {
         boolean closed = false;
         for (int i = 0; i < 30; i++) {
             selector.poll(1000L);
-            if (selector.channel(node) == null) {
+            if (selector.channel(node) == null && selector.closingChannel(node) == null) {
                 closed = true;
                 break;
             }
         }
         assertTrue("Channel was not closed by timeout", closed);
-        assertEquals(channelState, selector.disconnected().get(node));
+        assertEquals(channelState, selector.disconnected().get(node).state());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index ed84958..e456d68 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -53,7 +53,7 @@ public class NioEchoServer extends Thread {
     private final CredentialCache credentialCache;
 
     public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
-            String serverHost, ChannelBuilder channelBuilder) throws Exception {
+            String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache) throws Exception {
         super("echoserver");
         setDaemon(true);
         serverSocketChannel = ServerSocketChannel.open();
@@ -62,7 +62,7 @@ public class NioEchoServer extends Thread {
         this.port = serverSocketChannel.socket().getLocalPort();
         this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
         this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
-        this.credentialCache = new CredentialCache();
+        this.credentialCache = credentialCache;
         if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL)
             ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
         if (channelBuilder == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 459a4af..cffcc89 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -159,7 +159,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
     }
 
     /**
@@ -202,7 +202,7 @@ public class SslTransportLayerTest {
         };
         serverChannelBuilder.configure(sslServerConfigs);
         server = new NioEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), SecurityProtocol.SSL,
-                new TestSecurityConfig(sslServerConfigs), "localhost", serverChannelBuilder);
+                new TestSecurityConfig(sslServerConfigs), "localhost", serverChannelBuilder, null);
         server.start();
 
         createSelector(sslClientConfigs);
@@ -230,7 +230,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
     }
     
     /**
@@ -243,7 +243,7 @@ public class SslTransportLayerTest {
         String serverHost = InetAddress.getLocalHost().getHostAddress();
         SecurityProtocol securityProtocol = SecurityProtocol.SSL;
         server = new NioEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol,
-                new TestSecurityConfig(sslServerConfigs), serverHost, null);
+                new TestSecurityConfig(sslServerConfigs), serverHost, null, null);
         server.start();
         sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
         createSelector(sslClientConfigs);
@@ -295,7 +295,7 @@ public class SslTransportLayerTest {
         sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
         createSelector(sslClientConfigs);
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
         selector.close();
         server.close();
 
@@ -323,7 +323,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
     }
     
     /**
@@ -343,7 +343,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
     }
     
     /**
@@ -495,7 +495,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
     }
     
     /**
@@ -512,7 +512,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
     }
     
     /**
@@ -530,7 +530,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
     }
 
     /**
@@ -677,7 +677,7 @@ public class SslTransportLayerTest {
     }
 
     private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
-        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(sslServerConfigs));
+        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(sslServerConfigs), null);
     }
 
     private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index 8410e6a..d68e676 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -48,14 +48,14 @@ public class ApiKeysTest {
      * 'throttle_time_ms' to return the throttle time to the client. Exclusions are
      * <ul>
      *   <li>Cluster actions used only for inter-broker are throttled only if unauthorized
-     *   <li> SASL_HANDSHAKE is not throttled when used for authentication when a connection
-     *        is established. At any other time, this request returns an error response that
-     *        may be throttled.
+     *   <li> SASL_HANDSHAKE and SASL_AUTHENTICATE are not throttled when used for authentication
+     *        when a connection is established. At any other time, this request returns an error
+     *        response that may be throttled.
      * </ul>
      */
     @Test
     public void testResponseThrottleTime() {
-        List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE);
+        List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
 
         for (ApiKeys apiKey: ApiKeys.values()) {
             Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
index 2b2cc91..8cb6b80 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -24,7 +24,7 @@ public class ProtoUtilsTest {
     public void testDelayedAllocationSchemaDetection() throws Exception {
         //verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.
         for (ApiKeys key : ApiKeys.values()) {
-            if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP) {
+            if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP || key == ApiKeys.SASL_AUTHENTICATE) {
                 Assert.assertTrue(Protocol.requiresDelayedDeallocation(key.id));
             } else {
                 Assert.assertFalse(Protocol.requiresDelayedDeallocation(key.id));


Mime
View raw message