kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [1/2] kafka git commit: KAFKA-4764; Wrap SASL tokens in Kafka headers to improve diagnostics (KIP-152)
Date Fri, 15 Sep 2017 16:16:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 985cc534a -> 8fca43222


http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 2dd7db9..d41d61a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -26,11 +26,14 @@ import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.ChannelState;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
+import org.apache.kafka.common.network.SaslChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.SecurityProtocol;
@@ -38,15 +41,18 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.requests.SaslAuthenticateRequest;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.ScramFormatter;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.ScramMechanism;
@@ -63,10 +69,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import javax.security.auth.Subject;
 import javax.security.auth.login.Configuration;
 
 import static org.junit.Assert.assertEquals;
@@ -88,6 +96,8 @@ public class SaslAuthenticatorTest {
     private CertStores clientCertStores;
     private Map<String, Object> saslClientConfigs;
     private Map<String, Object> saslServerConfigs;
+    private CredentialCache credentialCache;
+    private int nextCorrelationId;
 
     @Before
     public void setup() throws Exception {
@@ -95,6 +105,7 @@ public class SaslAuthenticatorTest {
         clientCertStores = new CertStores(false, "localhost");
         saslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
         saslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
+        credentialCache = new CredentialCache();
     }
 
     @After
@@ -139,7 +150,7 @@ public class SaslAuthenticatorTest {
         String node = "0";
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword");
+        jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, "invalidpassword");
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
@@ -153,7 +164,7 @@ public class SaslAuthenticatorTest {
         String node = "0";
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD);
+        jaasConfig.setClientOptions("PLAIN", "invaliduser", TestJaasConfig.PASSWORD);
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
@@ -166,7 +177,7 @@ public class SaslAuthenticatorTest {
     public void testMissingUsernameSaslPlain() throws Exception {
         String node = "0";
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        jaasConfig.setPlainClientOptions(null, "mypassword");
+        jaasConfig.setClientOptions("PLAIN", null, "mypassword");
 
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         server = createEchoServer(securityProtocol);
@@ -190,7 +201,7 @@ public class SaslAuthenticatorTest {
     public void testMissingPasswordSaslPlain() throws Exception {
         String node = "0";
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        jaasConfig.setPlainClientOptions("myuser", null);
+        jaasConfig.setClientOptions("PLAIN", "myuser", null);
 
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         server = createEchoServer(securityProtocol);
@@ -359,10 +370,20 @@ public class SaslAuthenticatorTest {
      * when transport layer is PLAINTEXT. This test simulates SASL authentication using a
      * (non-SASL) PLAINTEXT client and sends ApiVersionsRequest straight after
      * connection to the server is established, before any SASL-related packets are sent.
+     * This test is run with SaslHandshake version 0 and no SaslAuthenticate headers.
      */
     @Test
-    public void testUnauthenticatedApiVersionsRequestOverPlaintext() throws Exception {
-        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT);
+    public void testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion0() throws
Exception {
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 0);
+    }
+
+    /**
+     * See {@link #testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0()} for test
scenario.
+     * This test is run with SaslHandshake version 1 and SaslAuthenticate headers.
+     */
+    @Test
+    public void testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion1() throws
Exception {
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 1);
     }
 
     /**
@@ -371,21 +392,32 @@ public class SaslAuthenticatorTest {
      * when transport layer is SSL. This test simulates SASL authentication using a
      * (non-SASL) SSL client and sends ApiVersionsRequest straight after
      * SSL handshake, before any SASL-related packets are sent.
+     * This test is run with SaslHandshake version 0 and no SaslAuthenticate headers.
      */
     @Test
-    public void testUnauthenticatedApiVersionsRequestOverSsl() throws Exception {
-        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL);
+    public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0() throws Exception
{
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL, (short) 0);
+    }
+
+    /**
+     * See {@link #testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion0()}
for test scenario.
+     * This test is run with SaslHandshake version 1 and SaslAuthenticate headers.
+     */
+    @Test
+    public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion1() throws Exception
{
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 1);
     }
 
     /**
      * Tests that unsupported version of ApiVersionsRequest before SASL handshake request
      * returns error response and does not result in authentication failure. This test
-     * is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol, short)}
      * where a non-SASL client is used to send requests that are processed by
      * {@link SaslServerAuthenticator} of the server prior to client authentication.
      */
     @Test
     public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
+        short handshakeVersion = ApiKeys.SASL_HANDSHAKE.latestVersion();
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         server = createEchoServer(securityProtocol);
@@ -405,14 +437,14 @@ public class SaslAuthenticatorTest {
         sendVersionRequestReceiveResponse(node);
 
         // Test that client can authenticate successfully
-        sendHandshakeRequestReceiveResponse(node);
-        authenticateUsingSaslPlainAndCheckConnection(node);
+        sendHandshakeRequestReceiveResponse(node, handshakeVersion);
+        authenticateUsingSaslPlainAndCheckConnection(node, handshakeVersion > 0);
     }
 
     /**
      * Tests that unsupported version of SASL handshake request returns error
      * response and fails authentication. This test is similar to
-     * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol, short)}
      * where a non-SASL client is used to send requests that are processed by
      * {@link SaslServerAuthenticator} of the server prior to client authentication.
      */
@@ -422,13 +454,15 @@ public class SaslAuthenticatorTest {
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         server = createEchoServer(securityProtocol);
 
-        // Send ApiVersionsRequest and validate error response.
+        // Send SaslHandshakeRequest and validate that connection is closed by server.
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
         SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
         RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, Short.MAX_VALUE,
"someclient", 2);
         selector.send(request.toSend(node1, header));
-        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
+        // This test uses a non-SASL PLAINTEXT client in order to do manual handshake.
+        // So the channel is in READY state.
+        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -449,12 +483,12 @@ public class SaslAuthenticatorTest {
         // Send invalid SASL packet after valid handshake request
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        sendHandshakeRequestReceiveResponse(node1);
+        sendHandshakeRequestReceiveResponse(node1, (short) 1);
         Random random = new Random();
         byte[] bytes = new byte[1024];
         random.nextBytes(bytes);
         selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes)));
-        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -465,7 +499,7 @@ public class SaslAuthenticatorTest {
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
         random.nextBytes(bytes);
         selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes)));
-        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -475,7 +509,7 @@ public class SaslAuthenticatorTest {
     /**
      * Tests that ApiVersionsRequest after Kafka SASL handshake request flow,
      * but prior to actual SASL authentication, results in authentication failure.
-     * This is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * This is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol,
short)}
      * where a non-SASL client is used to send requests that are processed by
      * {@link SaslServerAuthenticator} of the server prior to client authentication.
      */
@@ -488,12 +522,12 @@ public class SaslAuthenticatorTest {
         // Send handshake request followed by ApiVersionsRequest
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        sendHandshakeRequestReceiveResponse(node1);
+        sendHandshakeRequestReceiveResponse(node1, (short) 1);
 
-        ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
+        ApiVersionsRequest request = createApiVersionsRequestV0();
         RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS, request.version(),
"someclient", 2);
         selector.send(request.toSend(node1, versionsHeader));
-        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -514,13 +548,13 @@ public class SaslAuthenticatorTest {
         // Send SASL packet with large size after valid handshake request
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        sendHandshakeRequestReceiveResponse(node1);
+        sendHandshakeRequestReceiveResponse(node1, (short) 1);
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.putInt(Integer.MAX_VALUE);
         buffer.put(new byte[buffer.capacity() - 4]);
         buffer.rewind();
         selector.send(new NetworkSend(node1, buffer));
-        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -534,7 +568,7 @@ public class SaslAuthenticatorTest {
         buffer.put(new byte[buffer.capacity() - 4]);
         buffer.rewind();
         selector.send(new NetworkSend(node2, buffer));
-        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -559,7 +593,7 @@ public class SaslAuthenticatorTest {
         RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA, metadataRequest1.version(),
                 "someclient", 1);
         selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
-        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -568,12 +602,12 @@ public class SaslAuthenticatorTest {
         // Send metadata request after Kafka SASL handshake request
         String node2 = "invalid2";
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
-        sendHandshakeRequestReceiveResponse(node2);
+        sendHandshakeRequestReceiveResponse(node2, (short) 1);
         MetadataRequest metadataRequest2 = new MetadataRequest.Builder(Collections.singletonList("sometopic"),
true).build();
         RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA,
                 metadataRequest2.version(), "someclient", 2);
         selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
-        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -642,7 +676,7 @@ public class SaslAuthenticatorTest {
         TestJaasConfig staticJaasConfig = new TestJaasConfig();
         staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
                 serverOptions);
-        staticJaasConfig.setPlainClientOptions("user1", "invalidpassword");
+        staticJaasConfig.setClientOptions("PLAIN", "user1", "invalidpassword");
         Configuration.setConfiguration(staticJaasConfig);
         server = createEchoServer(securityProtocol);
 
@@ -715,6 +749,269 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests good path SASL/PLAIN authentication over PLAINTEXT with old version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of client.
+     */
+    @Test
+    public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeader() throws Exception
{
+        verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_PLAINTEXT,
"PLAIN");
+    }
+
+    /**
+     * Tests good path SASL/PLAIN authentication over PLAINTEXT with old version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of server.
+     */
+    @Test
+    public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeader() throws Exception
{
+        verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_PLAINTEXT,
"PLAIN");
+    }
+
+    /**
+     * Tests good path SASL/SCRAM authentication over PLAINTEXT with old version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of client.
+     */
+    @Test
+    public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeader() throws Exception
{
+        verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_PLAINTEXT,
"SCRAM-SHA-256");
+    }
+
+    /**
+     * Tests good path SASL/SCRAM authentication over PLAINTEXT with old version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of server.
+     */
+    @Test
+    public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeader() throws Exception
{
+        verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_PLAINTEXT,
"SCRAM-SHA-256");
+    }
+
+    /**
+     * Tests good path SASL/PLAIN authentication over SSL with old version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of client.
+     */
+    @Test
+    public void oldSaslPlainSslServerWithoutSaslAuthenticateHeader() throws Exception {
+        verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_SSL, "PLAIN");
+    }
+
+    /**
+     * Tests good path SASL/PLAIN authentication over SSL with old version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of server.
+     */
+    @Test
+    public void oldSaslPlainSslClientWithoutSaslAuthenticateHeader() throws Exception {
+        verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_SSL, "PLAIN");
+    }
+
+    /**
+     * Tests good path SASL/SCRAM authentication over SSL with old version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of client.
+     */
+    @Test
+    public void oldSaslScramSslServerWithoutSaslAuthenticateHeader() throws Exception {
+        verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
+    }
+
+    /**
+     * Tests good path SASL/SCRAM authentication over SSL with old version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of server.
+     */
+    @Test
+    public void oldSaslScramSslClientWithoutSaslAuthenticateHeader() throws Exception {
+        verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
+    }
+
+    /**
+     * Tests SASL/PLAIN authentication failure over PLAINTEXT with old version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of client.
+     */
+    @Test
+    public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws
Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_PLAINTEXT,
"PLAIN");
+    }
+
+    /**
+     * Tests SASL/PLAIN authentication failure over PLAINTEXT with old version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of server.
+     */
+    @Test
+    public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws
Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_PLAINTEXT,
"PLAIN");
+    }
+
+    /**
+     * Tests SASL/SCRAM authentication failure over PLAINTEXT with old version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of client.
+     */
+    @Test
+    public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws
Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_PLAINTEXT,
"SCRAM-SHA-256");
+    }
+
+    /**
+     * Tests SASL/SCRAM authentication failure over PLAINTEXT with old version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of server.
+     */
+    @Test
+    public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws
Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_PLAINTEXT,
"SCRAM-SHA-256");
+    }
+
+    /**
+     * Tests SASL/PLAIN authentication failure over SSL with old version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of client.
+     */
+    @Test
+    public void oldSaslPlainSslServerWithoutSaslAuthenticateHeaderFailure() throws Exception
{
+        verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_SSL,
"PLAIN");
+    }
+
+    /**
+     * Tests SASL/PLAIN authentication failure over SSL with old version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of server.
+     */
+    @Test
+    public void oldSaslPlainSslClientWithoutSaslAuthenticateHeaderFailure() throws Exception
{
+        verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL,
"PLAIN");
+    }
+
+    /**
+     * Tests SASL/SCRAM authentication failure over SSL with old version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of client.
+     */
+    @Test
+    public void oldSaslScramSslServerWithoutSaslAuthenticateHeaderFailure() throws Exception
{
+        verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_SSL,
"SCRAM-SHA-512");
+    }
+
+    /**
+     * Tests SASL/SCRAM authentication failure over SSL with old version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of server.
+     */
+    @Test
+    public void oldSaslScramSslClientWithoutSaslAuthenticateHeaderFailure() throws Exception
{
+        verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL,
"SCRAM-SHA-512");
+    }
+
+    private void verifySaslAuthenticateHeaderInterop(boolean enableHeaderOnServer, boolean
enableHeaderOnClient,
+            SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
+        configureMechanisms(saslMechanism, Arrays.asList(saslMechanism));
+        createServer(securityProtocol, saslMechanism, enableHeaderOnServer);
+
+        String node = "0";
+        createClientConnection(securityProtocol, saslMechanism, node, enableHeaderOnClient);
+        NetworkTestUtils.checkClientConnection(selector, "0", 100, 10);
+    }
+
+    private void verifySaslAuthenticateHeaderInteropWithFailure(boolean enableHeaderOnServer,
boolean enableHeaderOnClient,
+            SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
+        TestJaasConfig jaasConfig = configureMechanisms(saslMechanism, Arrays.asList(saslMechanism));
+        jaasConfig.setClientOptions(saslMechanism, TestJaasConfig.USERNAME, "invalidpassword");
+        createServer(securityProtocol, saslMechanism, enableHeaderOnServer);
+
+        String node = "0";
+        createClientConnection(securityProtocol, saslMechanism, node, enableHeaderOnClient);
+        // Without SASL_AUTHENTICATE headers, disconnect state is ChannelState.AUTHENTICATE
which is
+        // a hint that channel was closed during authentication, unlike ChannelState.AUTHENTICATE_FAILED
+        // which is an actual authentication failure reported by the broker.
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
+    }
+
+    private void createServer(SecurityProtocol securityProtocol, String saslMechanism,
+            boolean enableSaslAuthenticateHeader) throws Exception {
+        if (enableSaslAuthenticateHeader)
+            server = createEchoServer(securityProtocol);
+        else
+            server = startServerWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism);
+        updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
+    }
+
+    private void createClientConnection(SecurityProtocol securityProtocol, String saslMechanism,
String node,
+            boolean enableSaslAuthenticateHeader) throws Exception {
+        if (enableSaslAuthenticateHeader)
+            createClientConnection(securityProtocol, node);
+        else
+            createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism,
node);
+    }
+
+    private NioEchoServer startServerWithoutSaslAuthenticateHeader(final SecurityProtocol
securityProtocol, String saslMechanism)
+            throws Exception {
+        final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
+        final Map<String, ?> configs = Collections.emptyMap();
+        final JaasContext jaasContext = JaasContext.load(JaasContext.Type.SERVER, listenerName,
configs);
+
+        boolean isScram = ScramMechanism.isScram(saslMechanism);
+        if (isScram)
+            ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
+        SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext,
+                securityProtocol, listenerName, saslMechanism, true, credentialCache) {
+
+            @Override
+            protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?>
configs, String id,
+                            TransportLayer transportLayer, Subject subject) throws IOException
{
+                return new SaslServerAuthenticator(configs, id, jaasContext, subject, null,
+                                credentialCache, listenerName, securityProtocol, transportLayer)
{
+
+                    @Override
+                    protected ApiVersionsResponse apiVersionsResponse() {
+                        List<ApiVersion> apiVersions = new ArrayList<>(ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions());
+                        for (Iterator<ApiVersion> it = apiVersions.iterator(); it.hasNext();
) {
+                            ApiVersion apiVersion = it.next();
+                            if (apiVersion.apiKey == ApiKeys.SASL_AUTHENTICATE.id) {
+                                it.remove();
+                                break;
+                            }
+                        }
+                        return new ApiVersionsResponse(0, Errors.NONE, apiVersions);
+                    }
+
+                    @Override
+                    protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
+                        // Don't enable Kafka SASL_AUTHENTICATE headers
+                    }
+                };
+            }
+        };
+        serverChannelBuilder.configure(saslServerConfigs);
+        server = new NioEchoServer(listenerName, securityProtocol, new TestSecurityConfig(saslServerConfigs),
+                "localhost", serverChannelBuilder, credentialCache);
+        server.start();
+        return server;
+    }
+
+    private void createClientConnectionWithoutSaslAuthenticateHeader(final SecurityProtocol
securityProtocol,
+            final String saslMechanism, String node) throws Exception {
+
+        final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
+        final Map<String, ?> configs = Collections.emptyMap();
+        final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs);
+        SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext,
+                securityProtocol, listenerName, saslMechanism, true, null) {
+
+            @Override
+            protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?>
configs, String id,
+                    String serverHost, String servicePrincipal,
+                    TransportLayer transportLayer, Subject subject) throws IOException {
+
+                return new SaslClientAuthenticator(configs, id, subject,
+                        servicePrincipal, serverHost, saslMechanism, true, transportLayer)
{
+                    @Override
+                    protected SaslHandshakeRequest createSaslHandshakeRequest(short version)
{
+                        return new SaslHandshakeRequest.Builder(saslMechanism).build((short)
0);
+                    }
+                    @Override
+                    protected void saslAuthenticateVersion(short version) {
+                        // Don't set version so that headers are disabled
+                    }
+                };
+            }
+        };
+        clientChannelBuilder.configure(saslClientConfigs);
+        this.selector = NetworkTestUtils.createSelector(clientChannelBuilder);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+    }
+
+    /**
      * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
      * prior to SASL handshake flow and that subsequent authentication succeeds
      * when transport layer is PLAINTEXT/SSL. This test uses a non-SASL client that simulates
@@ -738,7 +1035,7 @@ public class SaslAuthenticatorTest {
      *       behaves exactly as a regular SASL_PLAINTEXT client that has completed authentication.</li>
      * </ol>
      */
-    private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol)
throws Exception {
+    private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol,
short saslHandshakeVersion) throws Exception {
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         server = createEchoServer(securityProtocol);
 
@@ -762,20 +1059,28 @@ public class SaslAuthenticatorTest {
         ApiVersionsResponse versionsResponse = sendVersionRequestReceiveResponse(node);
         assertEquals(ApiKeys.SASL_HANDSHAKE.oldestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion);
         assertEquals(ApiKeys.SASL_HANDSHAKE.latestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);
+        assertEquals(ApiKeys.SASL_AUTHENTICATE.oldestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id).minVersion);
+        assertEquals(ApiKeys.SASL_AUTHENTICATE.latestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id).maxVersion);
 
         // Send SaslHandshakeRequest and check response
-        SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node);
+        SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node,
saslHandshakeVersion);
         assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms());
 
         // Complete manual authentication and check send/receive succeed
-        authenticateUsingSaslPlainAndCheckConnection(node);
+        authenticateUsingSaslPlainAndCheckConnection(node, saslHandshakeVersion > 0);
     }
 
-    private void authenticateUsingSaslPlainAndCheckConnection(String node) throws Exception
{
+    private void authenticateUsingSaslPlainAndCheckConnection(String node, boolean enableSaslAuthenticateHeader)
throws Exception {
         // Authenticate using PLAIN username/password
         String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD;
-        selector.send(new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8"))));
-        waitForResponse();
+        ByteBuffer authBuf = ByteBuffer.wrap(authString.getBytes("UTF-8"));
+        if (enableSaslAuthenticateHeader) {
+            SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(authBuf).build();
+            sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_AUTHENTICATE, request);
+        } else {
+            selector.send(new NetworkSend(node, authBuf));
+            waitForResponse();
+        }
 
         // Check send/receive on the manually authenticated connection
         NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
@@ -805,7 +1110,7 @@ public class SaslAuthenticatorTest {
 
     private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol)
throws Exception {
         return NetworkTestUtils.createEchoServer(listenerName, securityProtocol,
-                new TestSecurityConfig(saslServerConfigs));
+                new TestSecurityConfig(saslServerConfigs), credentialCache);
     }
 
     private void createClientConnection(SecurityProtocol securityProtocol, String node) throws
Exception {
@@ -823,28 +1128,28 @@ public class SaslAuthenticatorTest {
 
     private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol,
String node) throws Exception {
         createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
         selector.close();
         selector = null;
     }
 
     private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey,
AbstractRequest request) throws IOException {
-        RequestHeader header = new RequestHeader(apiKey, request.version(), "someclient",
1);
+        RequestHeader header = new RequestHeader(apiKey, request.version(), "someclient",
nextCorrelationId++);
         Send send = request.toSend(node, header);
         selector.send(send);
         ByteBuffer responseBuffer = waitForResponse();
         return NetworkClient.parseResponse(responseBuffer, header);
     }
 
-    private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws
Exception {
-        SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN");
+    private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node, short
version) throws Exception {
+        SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest.Builder("PLAIN").build(version);
         SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node,
ApiKeys.SASL_HANDSHAKE, handshakeRequest);
         assertEquals(Errors.NONE, response.error());
         return response;
     }
 
     private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception
{
-        ApiVersionsRequest handshakeRequest = new ApiVersionsRequest.Builder().build();
+        ApiVersionsRequest handshakeRequest = createApiVersionsRequestV0();
         ApiVersionsResponse response =  (ApiVersionsResponse) sendKafkaRequestReceiveResponse(node,
ApiKeys.API_VERSIONS, handshakeRequest);
         assertEquals(Errors.NONE, response.error());
         return response;
@@ -866,8 +1171,14 @@ public class SaslAuthenticatorTest {
             if (scramMechanism != null) {
                 ScramFormatter formatter = new ScramFormatter(scramMechanism);
                 ScramCredential credential = formatter.generateCredential(password, 4096);
-                server.credentialCache().cache(scramMechanism.mechanismName(), ScramCredential.class).put(username,
credential);
+                credentialCache.cache(scramMechanism.mechanismName(), ScramCredential.class).put(username,
credential);
             }
         }
     }
+
+    // Creates an ApiVersionsRequest with version 0. Using v0 in tests since
+    // SaslClientAuthenticator always uses version 0
+    private ApiVersionsRequest createApiVersionsRequestV0() {
+        return new ApiVersionsRequest.Builder((short) 0).build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
index bdef6ef..5336fd7 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -54,13 +54,14 @@ public class TestJaasConfig extends Configuration {
         return new Password(loginModule(mechanism) + " required username=" + username + "
password=" + password + ";");
     }
 
-    public void setPlainClientOptions(String clientUsername, String clientPassword) {
+    public void setClientOptions(String saslMechanism, String clientUsername, String clientPassword)
{
         Map<String, Object> options = new HashMap<>();
         if (clientUsername != null)
             options.put("username", clientUsername);
         if (clientPassword != null)
             options.put("password", clientPassword);
-        createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options);
+        Class<?> loginModuleClass = ScramMechanism.isScram(saslMechanism) ? ScramLoginModule.class
: PlainLoginModule.class;
+        createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, loginModuleClass.getName(), options);
     }
 
     public void createOrUpdateEntry(String name, String loginModule, Map<String, Object>
options) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2c5517f..62e8abf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.requests.{Resource => RResource, ResourceType
=>
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.requests.SaslHandshakeResponse
+import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshakeResponse}
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import DescribeLogDirsResponse.LogDirInfo
@@ -132,6 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
         case ApiKeys.ALTER_REPLICA_DIR => handleAlterReplicaDirRequest(request)
         case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
+        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1257,6 +1258,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE,
config.saslEnabledMechanisms))
   }
 
+  def handleSaslAuthenticateRequest(request: RequestChannel.Request) {
+    sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(Errors.ILLEGAL_SASL_STATE,
+        "SaslAuthenticate request received after successful authentication"))
+  }
+
   def handleApiVersionsRequest(request: RequestChannel.Request) {
     // Note that broker returns its full list of supported ApiKeys and versions regardless
of current
     // authentication state (e.g., before SASL authentication on an SASL listener, do note
that no

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3ec03c3..bb9f82e 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -233,6 +233,9 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.SASL_HANDSHAKE =>
           new SaslHandshakeRequest.Builder("PLAIN")
 
+        case ApiKeys.SASL_AUTHENTICATE =>
+          new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new Array[Byte](0)))
+
         case ApiKeys.API_VERSIONS =>
           new ApiVersionsRequest.Builder
 
@@ -433,7 +436,8 @@ class RequestQuotaTest extends BaseRequestTest {
 
 object RequestQuotaTest {
   val ClusterActions = ApiKeys.values.toSet.filter(apiKey => apiKey.clusterAction)
-  val ClientActions = ApiKeys.values.toSet -- ClusterActions - ApiKeys.SASL_HANDSHAKE
+  val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
+  val ClientActions = ApiKeys.values.toSet -- ClusterActions -- SaslActions
 
   val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized")
   // Principal used for all client connections. This is modified by tests which

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 1ee4ac8..01d3a83 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -16,7 +16,6 @@
   */
 package kafka.server
 
-import java.io.IOException
 import java.net.Socket
 import java.util.Collections
 
@@ -66,12 +65,8 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup
{
     val plaintextSocket = connect(protocol = securityProtocol)
     try {
       sendSaslHandshakeRequestValidateResponse(plaintextSocket)
-      try {
-        sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest.Builder().build(0))
-        fail("Versions Request during Sasl handshake did not fail")
-      } catch {
-        case _: IOException => // expected exception
-      }
+      val response = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest.Builder().build(0))
+      assertEquals(Errors.ILLEGAL_SASL_STATE, response.error)
     } finally {
       plaintextSocket.close()
     }


Mime
View raw message