kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4259; Dynamic JAAS configuration for Kafka clients (KIP-85)
Date Sat, 24 Dec 2016 09:22:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 76169f9e2 -> 9eb665c39


KAFKA-4259; Dynamic JAAS configuration for Kafka clients (KIP-85)

Implementation of KIP-85: https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Sriharsha Chintalapani <harsha@hortonworks.com>, Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1979 from rajinisivaram/KAFKA-4259


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9eb665c3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9eb665c3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9eb665c3

Branch: refs/heads/trunk
Commit: 9eb665c39af1950548f903ad959449f2e69d7671
Parents: 76169f9
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Sat Dec 24 09:21:53 2016 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Dec 24 09:21:53 2016 +0000

----------------------------------------------------------------------
 .../apache/kafka/common/config/SaslConfigs.java |   8 +-
 .../common/network/SaslChannelBuilder.java      |   8 +-
 .../kafka/common/security/JaasConfig.java       | 123 +++++++++
 .../apache/kafka/common/security/JaasUtils.java |  64 ++++-
 .../kafka/common/security/auth/Login.java       |   3 +-
 .../security/authenticator/AbstractLogin.java   |  23 +-
 .../security/authenticator/LoginManager.java    |   9 +-
 .../authenticator/SaslServerAuthenticator.java  |   6 +-
 .../common/security/kerberos/KerberosLogin.java |  16 +-
 .../common/security/plain/PlainSaslServer.java  |   3 +-
 .../kafka/common/security/JaasUtilsTest.java    | 257 +++++++++++++++++++
 .../authenticator/SaslAuthenticatorTest.java    |  46 ++++
 .../security/authenticator/TestJaasConfig.java  |   5 +
 .../SaslPlainSslEndToEndAuthorizationTest.scala |  12 +
 .../scala/unit/kafka/utils/JaasTestUtils.scala  |  14 +-
 15 files changed, 551 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index d3aa0d6..9ae69e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -32,6 +32,11 @@ public class SaslConfigs {
         + "Only GSSAPI is enabled by default.";
     public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(GSSAPI_MECHANISM);
 
+    public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
+    public static final String SASL_JAAS_CONFIG_DOC = "JAAS login context parameters for SASL connections in the format used by JAAS configuration files. "
+        + "JAAS configuration file format is described <a href=\"http://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html\">here</a>. "
+        + "The format for the value is: '<loginModuleClass> <controlFlag> (<optionName>=<optionValue>)*;'";
+
     public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
     public static final String SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that Kafka runs as. "
         + "This can be defined either in Kafka's JAAS config or in Kafka's config.";
@@ -66,6 +71,7 @@ public class SaslConfigs {
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
-                .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC);
+                .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC)
+                .define(SaslConfigs.SASL_JAAS_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_JAAS_CONFIG_DOC);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 5c907ed..ba2f7df 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -18,6 +18,8 @@ import java.nio.channels.SocketChannel;
 import java.util.List;
 import java.util.Map;
 
+import javax.security.auth.login.Configuration;
+
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
@@ -39,6 +41,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
     private final LoginType loginType;
     private final boolean handshakeRequestEnable;
 
+    private Configuration jaasConfig;
     private LoginManager loginManager;
     private SslFactory sslFactory;
     private Map<String, ?> configs;
@@ -75,7 +78,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
                 if (principalToLocalRules != null)
                     kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
             }
-            this.loginManager = LoginManager.acquireLoginManager(loginType, hasKerberos, configs);
+            this.jaasConfig = JaasUtils.jaasConfig(loginType, configs);
+            this.loginManager = LoginManager.acquireLoginManager(loginType, hasKerberos, configs, jaasConfig);
 
             if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
                 // Disable SSL client authentication as we are using SASL authentication
@@ -93,7 +97,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
             TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
             Authenticator authenticator;
             if (mode == Mode.SERVER)
-                authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer,
+                authenticator = new SaslServerAuthenticator(id, jaasConfig, loginManager.subject(), kerberosShortNamer,
                         socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize);
             else
                 authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java b/clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java
new file mode 100644
index 0000000..2128c61
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security;
+
+import java.io.IOException;
+import java.io.StreamTokenizer;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.LoginType;
+
+/**
+ * JAAS configuration parser that constructs a JAAS configuration object with a single
+ * login context from the the Kafka configuration option {@link SaslConfigs#SASL_JAAS_CONFIG}.
+ * <p/>
+ * JAAS configuration file format is described <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html">here</a>.
+ * The format of the property value is:
+ * <pre>
+ * {@code
+ *   <loginModuleClass> <controlFlag> (<optionName>=<optionValue>)*;
+ * }
+ * </pre>
+ */
+class JaasConfig extends Configuration {
+
+    private final String loginContextName;
+    private final List<AppConfigurationEntry> configEntries;
+
+    public JaasConfig(LoginType loginType, String jaasConfigParams) {
+        StreamTokenizer tokenizer = new StreamTokenizer(new StringReader(jaasConfigParams));
+        tokenizer.slashSlashComments(true);
+        tokenizer.slashStarComments(true);
+        tokenizer.wordChars('-', '-');
+        tokenizer.wordChars('_', '_');
+        tokenizer.wordChars('$', '$');
+
+        try {
+            configEntries = new ArrayList<>();
+            while (tokenizer.nextToken() != StreamTokenizer.TT_EOF) {
+                configEntries.add(parseAppConfigurationEntry(tokenizer));
+            }
+            if (configEntries.isEmpty())
+                throw new IllegalArgumentException("Login module not specified in JAAS config");
+
+            this.loginContextName = loginType.contextName();
+
+        } catch (IOException e) {
+            throw new KafkaException("Unexpected exception while parsing JAAS config");
+        }
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+        if (this.loginContextName.equals(name))
+            return configEntries.toArray(new AppConfigurationEntry[0]);
+        else
+            return  null;
+    }
+
+    private LoginModuleControlFlag loginModuleControlFlag(String flag) {
+        LoginModuleControlFlag controlFlag;
+        switch (flag.toUpperCase(Locale.ROOT)) {
+            case "REQUIRED":
+                controlFlag = LoginModuleControlFlag.REQUIRED;
+                break;
+            case "REQUISITE":
+                controlFlag = LoginModuleControlFlag.REQUISITE;
+                break;
+            case "SUFFICIENT":
+                controlFlag = LoginModuleControlFlag.SUFFICIENT;
+                break;
+            case "OPTIONAL":
+                controlFlag = LoginModuleControlFlag.OPTIONAL;
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid login module control flag '" + flag + "' in JAAS config");
+        }
+        return controlFlag;
+    }
+
+    private AppConfigurationEntry parseAppConfigurationEntry(StreamTokenizer tokenizer) throws IOException {
+        String loginModule = tokenizer.sval;
+        if (tokenizer.nextToken() == StreamTokenizer.TT_EOF)
+            throw new IllegalArgumentException("Login module control flag not specified in JAAS config");
+        LoginModuleControlFlag controlFlag = loginModuleControlFlag(tokenizer.sval);
+        Map<String, String> options = new HashMap<>();
+        while (tokenizer.nextToken() != StreamTokenizer.TT_EOF && tokenizer.ttype != ';') {
+            String key = tokenizer.sval;
+            if (tokenizer.nextToken() != '=' || tokenizer.nextToken() == StreamTokenizer.TT_EOF || tokenizer.sval == null)
+                throw new IllegalArgumentException("Value not specified for key '" + key + "' in JAAS config");
+            String value = tokenizer.sval;
+            options.put(key, value);
+        }
+        if (tokenizer.ttype != ';')
+            throw new IllegalArgumentException("JAAS config entry not terminated by semi-colon");
+        return new AppConfigurationEntry(loginModule, controlFlag, options);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index c15d2e3..aa328d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -20,9 +20,13 @@ import javax.security.auth.login.Configuration;
 import javax.security.auth.login.AppConfigurationEntry;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Map;
 import java.io.IOException;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.LoginType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,15 +42,61 @@ public class JaasUtils {
     public static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig";
 
     /**
-     * Construct a JAAS configuration object per kafka jaas configuration file
-     * @param loginContextName
-     * @param key
-     * @return JAAS configuration object
+     * Returns a JAAS Configuration object. For loginType SERVER, default Configuration
+     * is returned. For loginType CLIENT, if JAAS configuration property
+     * {@link SaslConfigs#SASL_JAAS_CONFIG} is specified, the configuration object
+     * is created by parsing the property value. Otherwise, the default Configuration
+     * is returned.
+     * @throws IllegalArgumentException if JAAS configuration property is specified
+     * for loginType SERVER
      */
-    public static String jaasConfig(String loginContextName, String key) throws IOException {
-        AppConfigurationEntry[] configurationEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+    public static Configuration jaasConfig(LoginType loginType, Map<String, ?> configs) {
+        Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
+        if (jaasConfigArgs != null) {
+            if (loginType == LoginType.SERVER)
+                throw new IllegalArgumentException("JAAS config property not supported for server");
+            else
+                return new JaasConfig(loginType, jaasConfigArgs.value());
+        } else
+            return defaultJaasConfig(loginType);
+    }
+
+    private static Configuration defaultJaasConfig(LoginType loginType) {
+        String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
+        if (jaasConfigFile == null) {
+            LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
+                      SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
+        }
+
+        Configuration jaasConfig = Configuration.getConfiguration();
+
+        String loginContextName = loginType.contextName();
+        AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
+        if (configEntries == null) {
+            String errorMessage;
+            errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
+                    JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
+            throw new IllegalArgumentException(errorMessage);
+        }
+        return jaasConfig;
+    }
+
+    /**
+     * Returns the configuration option for <code>key</code> from the server login context
+     * of the default JAAS configuration.
+     */
+    public static String defaultServerJaasConfigOption(String key) throws IOException {
+        return jaasConfigOption(Configuration.getConfiguration(), LoginType.SERVER.contextName(), key);
+    }
+
+    /**
+     * Returns the configuration option for <code>key</code> from the login context
+     * <code>loginContextName</code> of the specified JAAS configuration.
+     */
+    public static String jaasConfigOption(Configuration jaasConfig, String loginContextName, String key) throws IOException {
+        AppConfigurationEntry[] configurationEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a '" + loginContextName + "' entry in this configuration.";
+            String errorMessage = "Could not find a '" + loginContextName + "' entry in this JAAS configuration.";
             throw new IOException(errorMessage);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
index 1ac779d..2f831c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
@@ -21,6 +21,7 @@ package org.apache.kafka.common.security.auth;
 import java.util.Map;
 
 import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
 
@@ -32,7 +33,7 @@ public interface Login {
     /**
      * Configures this login instance.
      */
-    void configure(Map<String, ?> configs, String loginContextName);
+    void configure(Map<String, ?> configs, Configuration jaasConfig, String loginContextName);
 
     /**
      * Performs login for each login module specified for the login context of this instance.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
index 2fe43ab..e1bbbce 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
@@ -18,7 +18,6 @@
 
 package org.apache.kafka.common.security.authenticator;
 
-import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
@@ -30,7 +29,6 @@ import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.Subject;
 
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.auth.Login;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,29 +41,20 @@ import java.util.Map;
 public abstract class AbstractLogin implements Login {
     private static final Logger log = LoggerFactory.getLogger(AbstractLogin.class);
 
+    private Configuration jaasConfig;
     private String loginContextName;
     private LoginContext loginContext;
 
 
     @Override
-    public void configure(Map<String, ?> configs, String loginContextName) {
+    public void configure(Map<String, ?> configs, Configuration jaasConfig, String loginContextName) {
+        this.jaasConfig = jaasConfig;
         this.loginContextName = loginContextName;
     }
 
     @Override
     public LoginContext login() throws LoginException {
-        String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
-        if (jaasConfigFile == null) {
-            log.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is not set, using default JAAS configuration.");
-        }
-        AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
-        if (configEntries == null) {
-            String errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
-                JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
-            throw new IllegalArgumentException(errorMessage);
-        }
-
-        loginContext = new LoginContext(loginContextName, new LoginCallbackHandler());
+        loginContext = new LoginContext(loginContextName, null, new LoginCallbackHandler(), jaasConfig);
         loginContext.login();
         log.info("Successfully logged in.");
         return loginContext;
@@ -76,6 +65,10 @@ public abstract class AbstractLogin implements Login {
         return loginContext.getSubject();
     }
 
+    protected Configuration jaasConfig() {
+        return jaasConfig;
+    }
+
     /**
      * Callback handler for creating login context. Login callback handlers
      * should support the callbacks required for the login modules used by

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 9aec9a7..3b1af1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -19,6 +19,7 @@
 package org.apache.kafka.common.security.authenticator;
 
 import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
 
 import java.io.IOException;
@@ -38,11 +39,11 @@ public class LoginManager {
     private final LoginType loginType;
     private int refCount;
 
-    private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException {
+    private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs, Configuration jaasConfig) throws IOException, LoginException {
         this.loginType = loginType;
         String loginContext = loginType.contextName();
         login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
-        login.configure(configs, loginContext);
+        login.configure(configs, jaasConfig, loginContext);
         login.login();
     }
 
@@ -61,11 +62,11 @@ public class LoginManager {
      *                  (i.e. consumer and producer)
      * @param configs configuration as key/value pairs
      */
-    public static final LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException {
+    public static final LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs, Configuration jaasConfig) throws IOException, LoginException {
         synchronized (LoginManager.class) {
             LoginManager loginManager = CACHED_INSTANCES.get(loginType);
             if (loginManager == null) {
-                loginManager = new LoginManager(loginType, hasKerberos, configs);
+                loginManager = new LoginManager(loginType, hasKerberos, configs, jaasConfig);
                 CACHED_INSTANCES.put(loginType, loginManager);
             }
             return loginManager.acquire();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 4d1cab7..223e798 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -78,6 +78,7 @@ public class SaslServerAuthenticator implements Authenticator {
     }
 
     private final String node;
+    private final Configuration jaasConfig;
     private final Subject subject;
     private final KerberosShortNamer kerberosNamer;
     private final int maxReceiveSize;
@@ -100,10 +101,11 @@ public class SaslServerAuthenticator implements Authenticator {
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
 
-    public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize) throws IOException {
+    public SaslServerAuthenticator(String node, Configuration jaasConfig, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize) throws IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
         this.node = node;
+        this.jaasConfig = jaasConfig;
         this.subject = subject;
         this.kerberosNamer = kerberosNameParser;
         this.maxReceiveSize = maxReceiveSize;
@@ -121,7 +123,7 @@ public class SaslServerAuthenticator implements Authenticator {
 
     private void createSaslServer(String mechanism) throws IOException {
         this.saslMechanism = mechanism;
-        callbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration(), kerberosNamer);
+        callbackHandler = new SaslServerCallbackHandler(jaasConfig, kerberosNamer);
         callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
         if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
             if (subject.getPrincipals().isEmpty())

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 63112e9..d8a040b 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -92,14 +92,14 @@ public class KerberosLogin extends AbstractLogin {
      * @throws javax.security.auth.login.LoginException
      *               Thrown if authentication fails.
      */
-    public void configure(Map<String, ?> configs, final String loginContextName) {
-        super.configure(configs, loginContextName);
+    public void configure(Map<String, ?> configs, Configuration jaasConfig, final String loginContextName) {
+        super.configure(configs, jaasConfig, loginContextName);
         this.loginContextName = loginContextName;
         this.ticketRenewWindowFactor = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
         this.ticketRenewJitter = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
         this.minTimeBeforeRelogin = (Long) configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
         this.kinitCmd = (String) configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD);
-        this.serviceName = getServiceName(configs, loginContextName);
+        this.serviceName = getServiceName(jaasConfig, configs, loginContextName);
     }
 
     @Override
@@ -110,7 +110,7 @@ public class KerberosLogin extends AbstractLogin {
         subject = loginContext.getSubject();
         isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
 
-        AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        AppConfigurationEntry[] entries = jaasConfig().getAppConfigurationEntry(loginContextName);
         if (entries.length == 0) {
             isUsingTicketCache = false;
             principal = null;
@@ -291,12 +291,12 @@ public class KerberosLogin extends AbstractLogin {
         return serviceName;
     }
 
-    private String getServiceName(Map<String, ?> configs, String loginContext) {
+    private String getServiceName(Configuration jaasConfig, Map<String, ?> configs, String loginContext) {
         String jaasServiceName;
         try {
-            jaasServiceName = JaasUtils.jaasConfig(loginContext, JaasUtils.SERVICE_NAME);
+            jaasServiceName = JaasUtils.jaasConfigOption(jaasConfig, loginContext, JaasUtils.SERVICE_NAME);
         } catch (IOException e) {
-            throw new KafkaException("Jaas configuration not found", e);
+            throw new KafkaException("JAAS configuration entry not found", e);
         }
         String configServiceName = (String) configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
         if (jaasServiceName != null && configServiceName != null && !jaasServiceName.equals(configServiceName)) {
@@ -376,7 +376,7 @@ public class KerberosLogin extends AbstractLogin {
             loginContext.logout();
             //login and also update the subject field of this instance to
             //have the new credentials (pass it to the LoginContext constructor)
-            loginContext = new LoginContext(loginContextName, subject);
+            loginContext = new LoginContext(loginContextName, subject, null, jaasConfig());
             log.info("Initiating re-login for {}", principal);
             loginContext.login();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
index 5c6fd78..ec7d696 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
@@ -29,7 +29,6 @@ import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 import javax.security.sasl.SaslServerFactory;
 
-import org.apache.kafka.common.network.LoginType;
 import org.apache.kafka.common.security.JaasUtils;
 
 /**
@@ -93,7 +92,7 @@ public class PlainSaslServer implements SaslServer {
             authorizationID = username;
 
         try {
-            String expectedPassword = JaasUtils.jaasConfig(LoginType.SERVER.contextName(), JAAS_USER_PREFIX + username);
+            String expectedPassword = JaasUtils.defaultServerJaasConfigOption(JAAS_USER_PREFIX + username);
             if (!password.equals(expectedPassword)) {
                 throw new SaslException("Authentication failed: Invalid username or password");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java
new file mode 100644
index 0000000..443393f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import javax.security.auth.login.Configuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.LoginType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests parsing of {@link SaslConfigs#SASL_JAAS_CONFIG} property and verifies that the format
+ * and parsing are consistent with JAAS configuration files loaded by the JRE.
+ */
+public class JaasUtilsTest {
+
+    private File jaasConfigFile;
+
+    @Before
+    public void setUp() throws IOException {
+        jaasConfigFile = File.createTempFile("jaas", ".conf");
+        jaasConfigFile.deleteOnExit();
+        Configuration.setConfiguration(null);
+        System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.toString());
+    }
+
+    @After
+    public void tearDown() {
+        jaasConfigFile.delete();
+    }
+
+    @Test
+    public void testConfigNoOptions() throws Exception {
+        checkConfiguration("test.testConfigNoOptions", LoginModuleControlFlag.REQUIRED, new HashMap<String, Object>());
+    }
+
+    @Test
+    public void testControlFlag() throws Exception {
+        LoginModuleControlFlag[] controlFlags = new LoginModuleControlFlag[] {
+            LoginModuleControlFlag.REQUIRED,
+            LoginModuleControlFlag.REQUISITE,
+            LoginModuleControlFlag.SUFFICIENT,
+            LoginModuleControlFlag.OPTIONAL
+        };
+        Map<String, Object> options = new HashMap<>();
+        options.put("propName", "propValue");
+        for (LoginModuleControlFlag controlFlag : controlFlags) {
+            checkConfiguration("test.testControlFlag", controlFlag, options);
+        }
+    }
+
+    @Test
+    public void testSingleOption() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        options.put("propName", "propValue");
+        checkConfiguration("test.testSingleOption", LoginModuleControlFlag.REQUISITE, options);
+    }
+
+    @Test
+    public void testMultipleOptions() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        for (int i = 0; i < 10; i++)
+            options.put("propName" + i, "propValue" + i);
+        checkConfiguration("test.testMultipleOptions", LoginModuleControlFlag.SUFFICIENT, options);
+    }
+
+    @Test
+    public void testQuotedOptionValue() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        options.put("propName", "prop value");
+        options.put("propName2", "value1 = 1, value2 = 2");
+        String config = String.format("test.testQuotedOptionValue required propName=\"%s\" propName2=\"%s\";", options.get("propName"), options.get("propName2"));
+        checkConfiguration(config, "test.testQuotedOptionValue", LoginModuleControlFlag.REQUIRED, options);
+    }
+
+    @Test
+    public void testQuotedOptionName() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        options.put("prop name", "propValue");
+        String config = "test.testQuotedOptionName required \"prop name\"=propValue;";
+        checkConfiguration(config, "test.testQuotedOptionName", LoginModuleControlFlag.REQUIRED, options);
+    }
+
+    @Test
+    public void testMultipleLoginModules() throws Exception {
+        StringBuilder builder = new StringBuilder();
+        int moduleCount = 3;
+        Map<Integer, Map<String, Object>> moduleOptions = new HashMap<>();
+        for (int i = 0; i < moduleCount; i++) {
+            Map<String, Object> options = new HashMap<>();
+            options.put("index", "Index" + i);
+            options.put("module", "Module" + i);
+            moduleOptions.put(i, options);
+            String module = jaasConfigProp("test.Module" + i, LoginModuleControlFlag.REQUIRED, options);
+            builder.append(' ');
+            builder.append(module);
+        }
+        String jaasConfigProp = builder.toString();
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp));
+        Configuration configuration = JaasUtils.jaasConfig(LoginType.CLIENT, configs);
+        AppConfigurationEntry[] dynamicEntries = configuration.getAppConfigurationEntry(LoginType.CLIENT.contextName());
+        assertEquals(moduleCount, dynamicEntries.length);
+
+        for (int i = 0; i < moduleCount; i++) {
+            AppConfigurationEntry entry = dynamicEntries[i];
+            checkEntry(entry, "test.Module" + i, LoginModuleControlFlag.REQUIRED, moduleOptions.get(i));
+        }
+
+        writeConfiguration(LoginType.SERVER, jaasConfigProp);
+        AppConfigurationEntry[] staticEntries = Configuration.getConfiguration().getAppConfigurationEntry(LoginType.SERVER.contextName());
+        for (int i = 0; i < moduleCount; i++) {
+            AppConfigurationEntry staticEntry = staticEntries[i];
+            checkEntry(staticEntry, dynamicEntries[i].getLoginModuleName(), LoginModuleControlFlag.REQUIRED, dynamicEntries[i].getOptions());
+        }
+    }
+
+    @Test
+    public void testMissingLoginModule() throws Exception {
+        checkInvalidConfiguration("  required option1=value1;");
+    }
+
+    @Test
+    public void testMissingControlFlag() throws Exception {
+        checkInvalidConfiguration("test.loginModule option1=value1;");
+    }
+
+    @Test
+    public void testMissingOptionValue() throws Exception {
+        checkInvalidConfiguration("loginModule required option1;");
+    }
+
+    @Test
+    public void testMissingSemicolon() throws Exception {
+        checkInvalidConfiguration("test.testMissingSemicolon required option1=value1");
+    }
+
+    @Test
+    public void testNumericOptionWithoutQuotes() throws Exception {
+        checkInvalidConfiguration("test.testNumericOptionWithoutQuotes required option1=3;");
+    }
+
+    @Test
+    public void testNumericOptionWithQuotes() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        options.put("option1", "3");
+        String config = "test.testNumericOptionWithQuotes required option1=\"3\";";
+        checkConfiguration(config, "test.testNumericOptionWithQuotes", LoginModuleControlFlag.REQUIRED, options);
+    }
+
+    private AppConfigurationEntry configurationEntry(LoginType loginType, String jaasConfigProp) {
+        Map<String, Object> configs = new HashMap<>();
+        if (jaasConfigProp != null)
+            configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp));
+        Configuration configuration = JaasUtils.jaasConfig(loginType, configs);
+        AppConfigurationEntry[] entry = configuration.getAppConfigurationEntry(loginType.contextName());
+        assertEquals(1, entry.length);
+        return entry[0];
+    }
+
+    private String controlFlag(LoginModuleControlFlag loginModuleControlFlag) {
+        // LoginModuleControlFlag.toString() has format "LoginModuleControlFlag: flag"
+        String[] tokens = loginModuleControlFlag.toString().split(" ");
+        return tokens[tokens.length - 1];
+    }
+
+    private String jaasConfigProp(String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(loginModule);
+        builder.append(' ');
+        builder.append(controlFlag(controlFlag));
+        for (Map.Entry<String, Object> entry : options.entrySet()) {
+            builder.append(' ');
+            builder.append(entry.getKey());
+            builder.append('=');
+            builder.append(entry.getValue());
+        }
+        builder.append(';');
+        return builder.toString();
+    }
+
+    private void writeConfiguration(LoginType loginType, String jaasConfigProp) throws IOException {
+        List<String> lines = Arrays.asList(loginType.contextName() + " { ", jaasConfigProp, "};");
+        Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
+        Configuration.setConfiguration(null);
+    }
+
+    private void checkConfiguration(String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) throws Exception {
+        String jaasConfigProp = jaasConfigProp(loginModule, controlFlag, options);
+        checkConfiguration(jaasConfigProp, loginModule, controlFlag, options);
+    }
+
+    private void checkEntry(AppConfigurationEntry entry, String loginModule, LoginModuleControlFlag controlFlag, Map<String, ?> options) {
+        assertEquals(loginModule, entry.getLoginModuleName());
+        assertEquals(controlFlag, entry.getControlFlag());
+        assertEquals(options, entry.getOptions());
+    }
+
+    private void checkConfiguration(String jaasConfigProp, String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) throws Exception {
+        AppConfigurationEntry dynamicEntry = configurationEntry(LoginType.CLIENT, jaasConfigProp);
+        checkEntry(dynamicEntry, loginModule, controlFlag, options);
+        assertNull("Static configuration updated", Configuration.getConfiguration().getAppConfigurationEntry(LoginType.CLIENT.contextName()));
+
+        writeConfiguration(LoginType.SERVER, jaasConfigProp);
+        AppConfigurationEntry staticEntry = configurationEntry(LoginType.SERVER, null);
+        checkEntry(staticEntry, loginModule, controlFlag, options);
+    }
+
+    private void checkInvalidConfiguration(String jaasConfigProp) throws IOException {
+        try {
+            writeConfiguration(LoginType.SERVER, jaasConfigProp);
+            AppConfigurationEntry entry = configurationEntry(LoginType.SERVER, null);
+            fail("Invalid JAAS configuration file didn't throw exception, entry=" + entry);
+        } catch (SecurityException e) {
+            // Expected exception
+        }
+        try {
+            AppConfigurationEntry entry = configurationEntry(LoginType.CLIENT, jaasConfigProp);
+            fail("Invalid JAAS configuration property didn't throw exception, entry=" + entry);
+        } catch (IllegalArgumentException e) {
+            // Expected exception
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 27c5695..f75d5b7 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,10 +49,13 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import javax.security.auth.login.Configuration;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -493,6 +497,41 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests dynamic JAAS configuration property for SASL clients. Invalid client credentials
+     * are set in the static JVM-wide configuration instance to ensure that the dynamic
+     * property override is used during authentication.
+     */
+    @Test
+    public void testDynamicJaasConfiguration() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, Arrays.asList("PLAIN"));
+        Map<String, Object> serverOptions = new HashMap<>();
+        serverOptions.put("user_user1", "user1-secret");
+        serverOptions.put("user_user2", "user2-secret");
+        TestJaasConfig staticJaasConfig = new TestJaasConfig();
+        staticJaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), serverOptions);
+        staticJaasConfig.setPlainClientOptions("user1", "invalidpassword");
+        Configuration.setConfiguration(staticJaasConfig);
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+        // Check that client using static Jaas config does not connect since password is invalid
+        createAndCheckClientConnectionFailure(securityProtocol, "1");
+
+        // Check that 'user1' can connect with a Jaas config property override
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user1-secret"));
+        createAndCheckClientConnection(securityProtocol, "2");
+
+        // Check that invalid password specified as Jaas config property results in connection failure
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user2-secret"));
+        createAndCheckClientConnectionFailure(securityProtocol, "3");
+
+        // Check that another user 'user2' can also connect with a Jaas config override without any changes to static configuration
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret"));
+        createAndCheckClientConnection(securityProtocol, "4");
+    }
+
+    /**
      * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
      * prior to SASL handshake flow and that subsequent authentication succeeds
      * when transport layer is PLAINTEXT/SSL. This test uses a non-SASL client that simulates
@@ -584,6 +623,13 @@ public class SaslAuthenticatorTest {
         selector = null;
     }
 
+    private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception {
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+        selector.close();
+        selector = null;
+    }
+
     private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException {
         RequestHeader header = new RequestHeader(apiKey.id, "someclient", 1);
         Send send = request.toSend(node, header);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
index 2291cc1..22a3267 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -20,6 +20,7 @@ import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
 
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 
@@ -40,6 +41,10 @@ public class TestJaasConfig extends Configuration {
         return config;
     }
 
+    public static Password jaasConfigProperty(String mechanism, String username, String password) {
+        return new Password(loginModule(mechanism) + " required username=" + username + " password=" + password + ";");
+    }
+
     public void setPlainClientOptions(String clientUsername, String clientPassword) {
         Map<String, Object> options = new HashMap<>();
         if (clientUsername != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index 591479e..214cd0b 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -16,7 +16,10 @@
   */
 package kafka.api
 
+import java.io.File
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.config.SaslConfigs
+import kafka.utils.JaasTestUtils
 
 class SaslPlainSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
@@ -24,4 +27,13 @@ class SaslPlainSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   override protected def kafkaServerSaslMechanisms = List("PLAIN")
   override val clientPrincipal = "testuser"
   override val kafkaPrincipal = "admin"
+
+  // Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests
+  override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanisms: List[String],
+      serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = None) {
+    super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, List()) // create static config without client login contexts
+    val clientLoginModule = JaasTestUtils.clientLoginModule(kafkaClientSaslMechanism, None)
+    producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginModule)
+    consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginModule)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb665c3/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index f39fa6b..70b0b2f 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -63,7 +63,7 @@ object JaasTestUtils {
       s"""$moduleName required
           |  debug=$debug
           |  ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n|  ", ";")}
-          |"""
+          |""".stripMargin
     }
   }
 
@@ -114,6 +114,9 @@ object JaasTestUtils {
     jaasFile.getCanonicalPath
   }
 
+  def clientLoginModule(mechanism: String, keytabLocation: Option[File]): String =
+    kafkaClientModule(mechanism, keytabLocation).toString
+
   private def zkSections: Seq[JaasSection] = Seq(
     new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
     new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword))))
@@ -140,8 +143,8 @@ object JaasTestUtils {
     new JaasSection(KafkaServerContextName, modules)
   }
 
-  private def kafkaClientSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = {
-    val modules = mechanisms.map {
+  def kafkaClientModule(mechanism: String, keytabLocation: Option[File]): JaasModule = {
+    mechanism match {
       case "GSSAPI" =>
         Krb5LoginModule(
           useKeyTab = true,
@@ -158,7 +161,10 @@ object JaasTestUtils {
         ).toJaasModule
       case mechanism => throw new IllegalArgumentException("Unsupported client mechanism " + mechanism)
     }
-    new JaasSection(KafkaClientContextName, modules)
+  }
+
+  private def kafkaClientSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = {
+    new JaasSection(KafkaClientContextName, mechanisms.map(m => kafkaClientModule(m, keytabLocation)))
   }
 
   private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =


Mime
View raw message