kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject kafka git commit: KAFKA-3166; Disable SSL client authentication for SASL_SSL security protocol
Date Fri, 29 Jan 2016 10:33:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 962aec1a7 -> 91491986c


KAFKA-3166; Disable SSL client authentication for SASL_SSL security protocol

Also:

* Fixed a bug in `createSslConfig` where we were always generating a
keystore even if `useClientCert` was false and `mode` was `Mode.CLIENT`.
* Pass `numRecords` to `consumerRecords` and other clean-ups (formatting and scaladoc).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Sriharsha Chintalapani <harsha@hortonworks.com>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #827 from ijuma/kafka-3166-disable-ssl-auth-sasl-ssl and squashes the following commits:

8265221 [Ismael Juma] Pass `numRecords` to `consumerRecords` and clean-ups.
a73db89 [Ismael Juma] SSL client authentication should be disabled for SASL_SSL security protocol


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/91491986
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/91491986
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/91491986

Branch: refs/heads/trunk
Commit: 91491986c0a4c9c4cde2ab33be822852b76759e6
Parents: 962aec1
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Jan 29 16:03:10 2016 +0530
Committer: Sriharsha Chintalapani <harsha@hortonworks.com>
Committed: Fri Jan 29 16:03:10 2016 +0530

----------------------------------------------------------------------
 .../common/network/SaslChannelBuilder.java      |  5 ++--
 .../kafka/common/security/ssl/SslFactory.java   | 13 ++++++++--
 .../org/apache/kafka/test/TestSslUtils.java     |  6 ++---
 .../kafka/api/EndToEndAuthorizationTest.scala   | 27 +++++++++-----------
 .../api/SaslSslEndToEndAuthorizationTest.scala  |  7 +++++
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 6 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/91491986/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 86ac779..34a87c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -66,8 +66,9 @@ public class SaslChannelBuilder implements ChannelBuilder {
                 kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
 
             if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
-                this.sslFactory = new SslFactory(mode);
-                this.sslFactory.configure(this.configs);
+                // Disable SSL client authentication as we are using SASL authentication
+                this.sslFactory = new SslFactory(mode, "none");
+                this.sslFactory.configure(configs);
             }
         } catch (Exception e) {
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/91491986/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index cb14c64..0d4d2ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -38,6 +38,9 @@ import javax.net.ssl.TrustManagerFactory;
 
 public class SslFactory implements Configurable {
 
+    private final Mode mode;
+    private final String clientAuthConfigOverride;
+
     private String protocol;
     private String provider;
     private String kmfAlgorithm;
@@ -51,10 +54,14 @@ public class SslFactory implements Configurable {
     private SSLContext sslContext;
     private boolean needClientAuth;
     private boolean wantClientAuth;
-    private final Mode mode;
 
     public SslFactory(Mode mode) {
+        this(mode, null);
+    }
+
+    public SslFactory(Mode mode, String clientAuthConfigOverride) {
         this.mode = mode;
+        this.clientAuthConfigOverride = clientAuthConfigOverride;
     }
 
     @Override
@@ -75,7 +82,9 @@ public class SslFactory implements Configurable {
         if (endpointIdentification != null)
             this.endpointIdentification = endpointIdentification;
 
-        String clientAuthConfig = (String) configs.get(SslConfigs.SSL_CLIENT_AUTH_CONFIG);
+        String clientAuthConfig = clientAuthConfigOverride;
+        if (clientAuthConfig == null)
+            clientAuthConfig = (String) configs.get(SslConfigs.SSL_CLIENT_AUTH_CONFIG);
         if (clientAuthConfig != null) {
             if (clientAuthConfig.equals("required"))
                 this.needClientAuth = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/91491986/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 71713af..fbe2630 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -199,18 +199,18 @@ public class TestSslUtils {
     public static  Map<String, Object> createSslConfig(boolean useClientCert, boolean
trustStore, Mode mode, File trustStoreFile, String certAlias, String host)
         throws IOException, GeneralSecurityException {
         Map<String, X509Certificate> certs = new HashMap<>();
-        File keyStoreFile;
+        File keyStoreFile = null;
         Password password = mode == Mode.SERVER ? new Password("ServerPassword") : new Password("ClientPassword");
 
         Password trustStorePassword = new Password("TrustStorePassword");
 
-        if (useClientCert) {
+        if (mode == Mode.CLIENT && useClientCert) {
             keyStoreFile = File.createTempFile("clientKS", ".jks");
             KeyPair cKP = generateKeyPair("RSA");
             X509Certificate cCert = generateCertificate("CN=" + host + ", O=A client", cKP,
30, "SHA1withRSA");
             createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(),
cCert);
             certs.put(certAlias, cCert);
-        } else {
+        } else if (mode == Mode.SERVER) {
             keyStoreFile = File.createTempFile("serverKS", ".jks");
             KeyPair sKP = generateKeyPair("RSA");
             X509Certificate sCert = generateCertificate("CN=" + host + ", O=A server", sKP,
30,

http://git-wip-us.apache.org/repos/asf/kafka/blob/91491986/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index f14149f..e2314b3 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -50,10 +50,8 @@ import scala.collection.JavaConverters._
   * brokers, but first it initializes a ZooKeeper server and client, which happens
   * in ZooKeeperTestHarness.
   *
-  * To start brokers when the security protocol is SASL_SSL, we need to set a cluster
-  * ACL, which happens optionally in KafkaServerTestHarness. If the security protocol
-  * is SSL or PLAINTEXT, then the ACL isn't set. The remaining ACLs to enable access
-  * to producers and consumers are set here. To set ACLs, we use AclCommand directly.
+  * To start brokers we need to set a cluster ACL, which happens optionally in KafkaServerTestHarness.
+  * The remaining ACLs to enable access to producers and consumers are set here. To set ACLs,
we use AclCommand directly.
   *
   * Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use
   * SaslTestHarness here directly because it extends ZooKeeperTestHarness, and we
@@ -63,13 +61,12 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup
{
   override val producerCount = 1
   override val consumerCount = 2
   override val serverCount = 3
-  override val setClusterAcl = Some(() =>
-    { AclCommand.main(clusterAclArgs)
-      servers.foreach( s =>
-        TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource)
-      )
-    } : Unit
-  )
+  override val setClusterAcl = Some { () =>
+    AclCommand.main(clusterAclArgs)
+    servers.foreach(s =>
+      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource)
+    )
+  }
   val numRecords = 1
   val group = "group"
   val topic = "e2etopic"
@@ -183,7 +180,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup
{
     //Consume records
     debug("Finished sending and starting to consume records")
     consumers.head.assign(List(tp).asJava)
-    consumeRecords(this.consumers.head)
+    consumeRecords(this.consumers.head, numRecords)
     debug("Finished consuming")
   }
 
@@ -221,7 +218,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup
{
     //Consume records
     debug("Finished sending and starting to consume records")
     consumers.head.assign(List(tp).asJava)
-    try{
+    try {
       consumeRecords(this.consumers.head)
       fail("Topic authorization exception expected")
     } catch {
@@ -245,7 +242,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup
{
     //Consume records
     debug("Finished sending and starting to consume records")
     consumers.head.assign(List(tp).asJava)
-    try{
+    try {
       consumeRecords(this.consumers.head)
       fail("Topic authorization exception expected")
     } catch {
@@ -290,4 +287,4 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup
{
       assertEquals(offset.toLong, record.offset())
     } 
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/91491986/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
index 0ca0904..470ec84 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
@@ -16,10 +16,17 @@
   */
 package kafka.api
 
+import kafka.server.KafkaConfig
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 class SaslSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override val clientPrincipal = "client"
   override val kafkaPrincipal = "kafka"
+
+  // Configure brokers to require SSL client authentication in order to verify that SASL_SSL
works correctly even if the
+  // client doesn't have a keystore. We want to cover the scenario where a broker requires
either SSL client
+  // authentication or SASL authentication with SSL as the transport layer (but not both).
+  serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91491986/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ac05314..93a181d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -443,7 +443,7 @@ object TestUtils extends Logging {
                               certAlias: String): Properties = {
     val props = new Properties
     if (usesSslTransportLayer(securityProtocol))
-      props.putAll(sslConfigs(mode, true, trustStoreFile, certAlias))
+      props.putAll(sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile,
certAlias))
     props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
     props
   }


Mime
View raw message