kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [1/4] kafka git commit: KAFKA-2460; Fix capitalisation in SSL classes
Date Sat, 24 Oct 2015 16:45:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6f2f1f984 -> 16f194b20


http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
new file mode 100644
index 0000000..30bdb6d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.clients.CommonClientConfigs;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.math.BigInteger;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.*;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.X509v1CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
+import org.bouncycastle.crypto.util.PrivateKeyFactory;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder;
+import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder;
+import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestSslUtils {
+
+    /**
+     * Create a self-signed X.509 Certificate.
+     * From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html.
+     *
+     * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
+     * @param pair the KeyPair
+     * @param days how many days from now the Certificate is valid for
+     * @param algorithm the signing algorithm, eg "SHA1withRSA"
+     * @return the self-signed certificate
+     * @throws CertificateException thrown if a security error or an IO error ocurred.
+     */
+    public static X509Certificate generateCertificate(String dn, KeyPair pair,
+                                                      int days, String algorithm)
+        throws  CertificateException {
+
+        try {
+            Security.addProvider(new BouncyCastleProvider());
+            AlgorithmIdentifier sigAlgId = new DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
+            AlgorithmIdentifier digAlgId = new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
+            AsymmetricKeyParameter privateKeyAsymKeyParam = PrivateKeyFactory.createKey(pair.getPrivate().getEncoded());
+            SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(pair.getPublic().getEncoded());
+            ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, digAlgId).build(privateKeyAsymKeyParam);
+            X500Name name = new X500Name(dn);
+            Date from = new Date();
+            Date to = new Date(from.getTime() + days * 86400000L);
+            BigInteger sn = new BigInteger(64, new SecureRandom());
+
+            X509v1CertificateBuilder v1CertGen = new X509v1CertificateBuilder(name, sn, from,
to, name, subPubKeyInfo);
+            X509CertificateHolder certificateHolder = v1CertGen.build(sigGen);
+            return new JcaX509CertificateConverter().setProvider("BC").getCertificate(certificateHolder);
+        } catch (CertificateException ce) {
+            throw ce;
+        } catch (Exception e) {
+            throw new CertificateException(e);
+        }
+    }
+
+    public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException
{
+        KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
+        keyGen.initialize(1024);
+        return keyGen.genKeyPair();
+    }
+
+    private static KeyStore createEmptyKeyStore() throws GeneralSecurityException, IOException
{
+        KeyStore ks = KeyStore.getInstance("JKS");
+        ks.load(null, null); // initialize
+        return ks;
+    }
+
+    private static void saveKeyStore(KeyStore ks, String filename,
+                                     String password) throws GeneralSecurityException, IOException
{
+        FileOutputStream out = new FileOutputStream(filename);
+        try {
+            ks.store(out, password.toCharArray());
+        } finally {
+            out.close();
+        }
+    }
+
+    public static void createKeyStore(String filename,
+                                      String password, String alias,
+                                      Key privateKey, Certificate cert) throws GeneralSecurityException,
IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setKeyEntry(alias, privateKey, password.toCharArray(),
+                new Certificate[]{cert});
+        saveKeyStore(ks, filename, password);
+    }
+
+    /**
+     * Creates a keystore with a single key and saves it to a file.
+     *
+     * @param filename String file to save
+     * @param password String store password to set on keystore
+     * @param keyPassword String key password to set on key
+     * @param alias String alias to use for the key
+     * @param privateKey Key to save in keystore
+     * @param cert Certificate to use as certificate chain associated to key
+     * @throws GeneralSecurityException for any error with the security APIs
+     * @throws IOException if there is an I/O error saving the file
+     */
+    public static void createKeyStore(String filename,
+                                      String password, String keyPassword, String alias,
+                                      Key privateKey, Certificate cert) throws GeneralSecurityException,
IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(),
+                new Certificate[]{cert});
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static void createTrustStore(String filename,
+                                        String password, String alias,
+                                        Certificate cert) throws GeneralSecurityException,
IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setCertificateEntry(alias, cert);
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static <T extends Certificate> void createTrustStore(
+            String filename, String password, Map<String, T> certs) throws GeneralSecurityException,
IOException {
+        KeyStore ks = KeyStore.getInstance("JKS");
+        try {
+            FileInputStream in = new FileInputStream(filename);
+            ks.load(in, password.toCharArray());
+            in.close();
+        } catch (EOFException e) {
+            ks = createEmptyKeyStore();
+        }
+        for (Map.Entry<String, T> cert : certs.entrySet()) {
+            ks.setCertificateEntry(cert.getKey(), cert.getValue());
+        }
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static Map<String, X509Certificate> createX509Certificates(KeyPair keyPair)
+        throws GeneralSecurityException {
+        Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+        X509Certificate cert = generateCertificate("CN=localhost, O=localhost", keyPair,
30, "SHA1withRSA");
+        certs.put("localhost", cert);
+        return certs;
+    }
+
+    public static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile,
String password, String keyPassword,
+                                                      File trustStoreFile, String trustStorePassword)
{
+        Map<String, Object> sslConfigs = new HashMap<>();
+        sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security
protocol
+        sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create
SSLContext
+
+        if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null))
{
+            sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
+            sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+            sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
+            sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
+            sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
+        }
+
+        sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
+        sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
+        sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
+        sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
+
+        List<String> enabledProtocols  = new ArrayList<>();
+        enabledProtocols.add("TLSv1.2");
+        sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
+
+        return sslConfigs;
+    }
+
+    public static  Map<String, Object> createSslConfig(boolean useClientCert, boolean
trustStore, Mode mode, File trustStoreFile, String certAlias)
+        throws IOException, GeneralSecurityException {
+        Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+        File keyStoreFile;
+        String password;
+
+        if (mode == Mode.SERVER)
+            password = "ServerPassword";
+        else
+            password = "ClientPassword";
+
+        String trustStorePassword = "TrustStorePassword";
+
+        if (useClientCert) {
+            keyStoreFile = File.createTempFile("clientKS", ".jks");
+            KeyPair cKP = generateKeyPair("RSA");
+            X509Certificate cCert = generateCertificate("CN=localhost, O=client", cKP, 30,
"SHA1withRSA");
+            createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(),
cCert);
+            certs.put(certAlias, cCert);
+        } else {
+            keyStoreFile = File.createTempFile("serverKS", ".jks");
+            KeyPair sKP = generateKeyPair("RSA");
+            X509Certificate sCert = generateCertificate("CN=localhost, O=server", sKP, 30,
+                                                        "SHA1withRSA");
+            createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(),
sCert);
+            certs.put(certAlias, sCert);
+        }
+
+        if (trustStore) {
+            createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
+        }
+
+        Map<String, Object> sslConfig = createSslConfig(mode, keyStoreFile, password,
+                                                        password, trustStoreFile, trustStorePassword);
+        return sslConfig;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
index bd2ba56..1e09e8b 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
@@ -20,7 +20,7 @@ package org.apache.kafka.copycat.runtime.distributed;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.SaslConfigs;
 
 import java.util.Map;
@@ -139,21 +139,21 @@ public class DistributedHerderConfig extends AbstractConfig {
                         ConfigDef.Importance.LOW,
                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS,
SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
-                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL,
ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
-                .define(SSLConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM,
SSLConfigs.SSL_PROVIDER_DOC, false)
-                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW,
SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
-                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS,
ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
-                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE,
ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
-                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
-                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE,
ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
-                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING,
SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
-                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING,
SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS,
SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL,
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM,
SslConfigs.SSL_PROVIDER_DOC, false)
+                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW,
SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS,
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE,
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE,
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING,
SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING,
SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                 .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM,
SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD,
ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
                 .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, ConfigDef.Type.STRING, ConfigDef.Importance.LOW,
SaslConfigs.SASL_KAFKA_SERVER_DOC, false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index aa15612..43ae38e 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -24,7 +24,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.api.ApiUtils._
 import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.network.{SSLTransportLayer, TransportLayer, Send, MultiSend}
+import org.apache.kafka.common.network.{Send, MultiSend}
 
 import scala.collection._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 79e16c1..42b76cd 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -17,9 +17,6 @@
 
 package kafka.cluster
 
-import kafka.utils.CoreUtils._
-import kafka.utils.Json
-import kafka.api.ApiUtils._
 import java.nio.ByteBuffer
 
 import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
@@ -29,8 +26,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 /**
  * A Kafka broker.
  * A broker has an id and a collection of end-points.
- * Each end-point is (host, port,protocolType).
- * Currently the only protocol type is PlainText but we will add SSL and Kerberos in the
future.
+ * Each end-point is (host, port, protocolType).
  */
 object Broker {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d52b5c0..7aba1c9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -28,7 +28,7 @@ import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.SaslConfigs
 
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SSLConfigs}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.auth.PrincipalBuilder
@@ -156,17 +156,17 @@ object Defaults {
   val MetricReporterClasses = ""
 
   /** ********* SSL configuration ***********/
-  val PrincipalBuilderClass = SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
-  val SSLProtocol = SSLConfigs.DEFAULT_SSL_PROTOCOL
-  val SSLEnabledProtocols = SSLConfigs.DEFAULT_ENABLED_PROTOCOLS
-  val SSLKeystoreType = SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE
-  val SSLTruststoreType = SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
-  val SSLKeyManagerAlgorithm = SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
-  val SSLTrustManagerAlgorithm = SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
-  val SSLClientAuthRequired = "required"
-  val SSLClientAuthRequested = "requested"
-  val SSLClientAuthNone = "none"
-  val SSLClientAuth = SSLClientAuthNone
+  val PrincipalBuilderClass = SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
+  val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
+  val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
+  val SslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+  val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+  val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
+  val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
+  val SslClientAuthRequired = "required"
+  val SslClientAuthRequested = "requested"
+  val SslClientAuthNone = "none"
+  val SslClientAuth = SslClientAuthNone
 
   /** ********* Sasl configuration ***********/
   val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD
@@ -305,22 +305,22 @@ object KafkaConfig {
   val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
 
   /** ********* SSL Configuration ****************/
-  val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
-  val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG
-  val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG
-  val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG
-  val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
-  val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG
-  val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG
-  val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
-  val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG
-  val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
-  val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
-  val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
-  val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
-  val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
-  val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
-  val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
+  val PrincipalBuilderClassProp = SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
+  val SslProtocolProp = SslConfigs.SSL_PROTOCOL_CONFIG
+  val SslProviderProp = SslConfigs.SSL_PROVIDER_CONFIG
+  val SslCipherSuitesProp = SslConfigs.SSL_CIPHER_SUITES_CONFIG
+  val SslEnabledProtocolsProp = SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
+  val SslKeystoreTypeProp = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG
+  val SslKeystoreLocationProp = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG
+  val SslKeystorePasswordProp = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
+  val SslKeyPasswordProp = SslConfigs.SSL_KEY_PASSWORD_CONFIG
+  val SslTruststoreTypeProp = SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
+  val SslTruststoreLocationProp = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
+  val SslTruststorePasswordProp = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
+  val SslKeyManagerAlgorithmProp = SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
+  val SslTrustManagerAlgorithmProp = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
+  val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
+  val SslClientAuthProp = SslConfigs.SSL_CLIENT_AUTH_CONFIG
 
   /** ********* SASL Configuration ****************/
   val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME
@@ -480,22 +480,22 @@ object KafkaConfig {
   val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
 
   /** ********* SSL Configuration ****************/
-  val PrincipalBuilderClassDoc = SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC
-  val SSLProtocolDoc = SSLConfigs.SSL_PROTOCOL_DOC
-  val SSLProviderDoc = SSLConfigs.SSL_PROVIDER_DOC
-  val SSLCipherSuitesDoc = SSLConfigs.SSL_CIPHER_SUITES_DOC
-  val SSLEnabledProtocolsDoc = SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC
-  val SSLKeystoreTypeDoc = SSLConfigs.SSL_KEYSTORE_TYPE_DOC
-  val SSLKeystoreLocationDoc = SSLConfigs.SSL_KEYSTORE_LOCATION_DOC
-  val SSLKeystorePasswordDoc = SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC
-  val SSLKeyPasswordDoc = SSLConfigs.SSL_KEY_PASSWORD_DOC
-  val SSLTruststoreTypeDoc = SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC
-  val SSLTruststorePasswordDoc = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC
-  val SSLTruststoreLocationDoc = SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC
-  val SSLKeyManagerAlgorithmDoc = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC
-  val SSLTrustManagerAlgorithmDoc = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC
-  val SSLEndpointIdentificationAlgorithmDoc = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
-  val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC
+  val PrincipalBuilderClassDoc = SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC
+  val SslProtocolDoc = SslConfigs.SSL_PROTOCOL_DOC
+  val SslProviderDoc = SslConfigs.SSL_PROVIDER_DOC
+  val SslCipherSuitesDoc = SslConfigs.SSL_CIPHER_SUITES_DOC
+  val SslEnabledProtocolsDoc = SslConfigs.SSL_ENABLED_PROTOCOLS_DOC
+  val SslKeystoreTypeDoc = SslConfigs.SSL_KEYSTORE_TYPE_DOC
+  val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC
+  val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC
+  val SslKeyPasswordDoc = SslConfigs.SSL_KEY_PASSWORD_DOC
+  val SslTruststoreTypeDoc = SslConfigs.SSL_TRUSTSTORE_TYPE_DOC
+  val SslTruststorePasswordDoc = SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC
+  val SslTruststoreLocationDoc = SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC
+  val SslKeyManagerAlgorithmDoc = SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC
+  val SslTrustManagerAlgorithmDoc = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC
+  val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
+  val SslClientAuthDoc = SslConfigs.SSL_CLIENT_AUTH_DOC
 
   /** ********* Sasl Configuration ****************/
   val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC
@@ -645,21 +645,21 @@ object KafkaConfig {
 
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
-      .define(SSLProtocolProp, STRING, Defaults.SSLProtocol, MEDIUM, SSLProtocolDoc)
-      .define(SSLProviderProp, STRING, MEDIUM, SSLProviderDoc, false)
-      .define(SSLEnabledProtocolsProp, LIST, Defaults.SSLEnabledProtocols, MEDIUM, SSLEnabledProtocolsDoc)
-      .define(SSLKeystoreTypeProp, STRING, Defaults.SSLKeystoreType, MEDIUM, SSLKeystoreTypeDoc)
-      .define(SSLKeystoreLocationProp, STRING, MEDIUM, SSLKeystoreLocationDoc, false)
-      .define(SSLKeystorePasswordProp, STRING, MEDIUM, SSLKeystorePasswordDoc, false)
-      .define(SSLKeyPasswordProp, STRING, MEDIUM, SSLKeyPasswordDoc, false)
-      .define(SSLTruststoreTypeProp, STRING, Defaults.SSLTruststoreType, MEDIUM, SSLTruststoreTypeDoc)
-      .define(SSLTruststoreLocationProp, STRING, MEDIUM, SSLTruststoreLocationDoc, false)
-      .define(SSLTruststorePasswordProp, STRING, MEDIUM, SSLTruststorePasswordDoc, false)
-      .define(SSLKeyManagerAlgorithmProp, STRING, Defaults.SSLKeyManagerAlgorithm, MEDIUM,
SSLKeyManagerAlgorithmDoc)
-      .define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM,
SSLTrustManagerAlgorithmDoc)
-      .define(SSLEndpointIdentificationAlgorithmProp, STRING, LOW, SSLEndpointIdentificationAlgorithmDoc,
false)
-      .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired,
Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc)
-      .define(SSLCipherSuitesProp, LIST, MEDIUM, SSLCipherSuitesDoc, false)
+      .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc)
+      .define(SslProviderProp, STRING, MEDIUM, SslProviderDoc, false)
+      .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc)
+      .define(SslKeystoreTypeProp, STRING, Defaults.SslKeystoreType, MEDIUM, SslKeystoreTypeDoc)
+      .define(SslKeystoreLocationProp, STRING, MEDIUM, SslKeystoreLocationDoc, false)
+      .define(SslKeystorePasswordProp, STRING, MEDIUM, SslKeystorePasswordDoc, false)
+      .define(SslKeyPasswordProp, STRING, MEDIUM, SslKeyPasswordDoc, false)
+      .define(SslTruststoreTypeProp, STRING, Defaults.SslTruststoreType, MEDIUM, SslTruststoreTypeDoc)
+      .define(SslTruststoreLocationProp, STRING, MEDIUM, SslTruststoreLocationDoc, false)
+      .define(SslTruststorePasswordProp, STRING, MEDIUM, SslTruststorePasswordDoc, false)
+      .define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM,
SslKeyManagerAlgorithmDoc)
+      .define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM,
SslTrustManagerAlgorithmDoc)
+      .define(SslEndpointIdentificationAlgorithmProp, STRING, LOW, SslEndpointIdentificationAlgorithmDoc,
false)
+      .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired,
Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
+      .define(SslCipherSuitesProp, LIST, MEDIUM, SslCipherSuitesDoc, false)
 
       /** ********* Sasl Configuration ****************/
       .define(SaslKerberosServiceNameProp, STRING, MEDIUM, SaslKerberosServiceNameDoc, false)
@@ -815,20 +815,20 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
 
   /** ********* SSL Configuration **************/
   val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
-  val sslProtocol = getString(KafkaConfig.SSLProtocolProp)
-  val sslProvider = getString(KafkaConfig.SSLProviderProp)
-  val sslEnabledProtocols = getList(KafkaConfig.SSLEnabledProtocolsProp)
-  val sslKeystoreType = getString(KafkaConfig.SSLKeystoreTypeProp)
-  val sslKeystoreLocation = getString(KafkaConfig.SSLKeystoreLocationProp)
-  val sslKeystorePassword = getString(KafkaConfig.SSLKeystorePasswordProp)
-  val sslKeyPassword = getString(KafkaConfig.SSLKeyPasswordProp)
-  val sslTruststoreType = getString(KafkaConfig.SSLTruststoreTypeProp)
-  val sslTruststoreLocation = getString(KafkaConfig.SSLTruststoreLocationProp)
-  val sslTruststorePassword = getString(KafkaConfig.SSLTruststorePasswordProp)
-  val sslKeyManagerAlgorithm = getString(KafkaConfig.SSLKeyManagerAlgorithmProp)
-  val sslTrustManagerAlgorithm = getString(KafkaConfig.SSLTrustManagerAlgorithmProp)
-  val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp)
-  val sslCipher = getList(KafkaConfig.SSLCipherSuitesProp)
+  val sslProtocol = getString(KafkaConfig.SslProtocolProp)
+  val sslProvider = getString(KafkaConfig.SslProviderProp)
+  val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp)
+  val sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
+  val sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
+  val sslKeystorePassword = getString(KafkaConfig.SslKeystorePasswordProp)
+  val sslKeyPassword = getString(KafkaConfig.SslKeyPasswordProp)
+  val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp)
+  val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp)
+  val sslTruststorePassword = getString(KafkaConfig.SslTruststorePasswordProp)
+  val sslKeyManagerAlgorithm = getString(KafkaConfig.SslKeyManagerAlgorithmProp)
+  val sslTrustManagerAlgorithm = getString(KafkaConfig.SslTrustManagerAlgorithmProp)
+  val sslClientAuth = getString(KafkaConfig.SslClientAuthProp)
+  val sslCipher = getList(KafkaConfig.SslCipherSuitesProp)
 
   /** ********* Sasl Configuration **************/
   val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 06be5c2..62c02dd 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -30,10 +30,10 @@ import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient,
ClientReq
 import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive,
Selector, Mode}
 import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend,
AbstractRequest, ListOffsetRequest}
 import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
+import org.apache.kafka.common.security.ssl.SslFactory
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{Errors, ApiKeys}
-import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{JavaConverters, Map, mutable}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b0cb97e..01f198e 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -219,7 +219,7 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
-  def testSSLSocketServer(): Unit = {
+  def testSslSocketServer(): Unit = {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
     val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl
= true,
       trustStoreFile = Some(trustStoreFile))

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 4059dc2..d5ab262 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -491,21 +491,21 @@ class KafkaConfigTest {
 
         //SSL Configs
         case KafkaConfig.PrincipalBuilderClassProp =>
-        case KafkaConfig.SSLProtocolProp => // ignore string
-        case KafkaConfig.SSLProviderProp => // ignore string
-        case KafkaConfig.SSLEnabledProtocolsProp =>
-        case KafkaConfig.SSLKeystoreTypeProp => // ignore string
-        case KafkaConfig.SSLKeystoreLocationProp => // ignore string
-        case KafkaConfig.SSLKeystorePasswordProp => // ignore string
-        case KafkaConfig.SSLKeyPasswordProp => // ignore string
-        case KafkaConfig.SSLTruststoreTypeProp => // ignore string
-        case KafkaConfig.SSLTruststorePasswordProp => // ignore string
-        case KafkaConfig.SSLTruststoreLocationProp => // ignore string
-        case KafkaConfig.SSLKeyManagerAlgorithmProp =>
-        case KafkaConfig.SSLTrustManagerAlgorithmProp =>
-        case KafkaConfig.SSLClientAuthProp => // ignore string
-        case KafkaConfig.SSLEndpointIdentificationAlgorithmProp => // ignore string
-        case KafkaConfig.SSLCipherSuitesProp => // ignore string
+        case KafkaConfig.SslProtocolProp => // ignore string
+        case KafkaConfig.SslProviderProp => // ignore string
+        case KafkaConfig.SslEnabledProtocolsProp =>
+        case KafkaConfig.SslKeystoreTypeProp => // ignore string
+        case KafkaConfig.SslKeystoreLocationProp => // ignore string
+        case KafkaConfig.SslKeystorePasswordProp => // ignore string
+        case KafkaConfig.SslKeyPasswordProp => // ignore string
+        case KafkaConfig.SslTruststoreTypeProp => // ignore string
+        case KafkaConfig.SslTruststorePasswordProp => // ignore string
+        case KafkaConfig.SslTruststoreLocationProp => // ignore string
+        case KafkaConfig.SslKeyManagerAlgorithmProp =>
+        case KafkaConfig.SslTrustManagerAlgorithmProp =>
+        case KafkaConfig.SslClientAuthProp => // ignore string
+        case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore string
+        case KafkaConfig.SslCipherSuitesProp => // ignore string
 
         //Sasl Configs
         case KafkaConfig.SaslKerberosServiceNameProp => // ignore string

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ca17c6b..5ad548d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,7 +28,9 @@ import charset.Charset
 
 import kafka.security.auth.{Resource, Authorizer, Acl}
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.ssl.SslFactory
 import org.apache.kafka.common.utils.Utils._
+import org.apache.kafka.test.TestSslUtils
 
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
@@ -52,8 +54,6 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.network.Mode
-import org.apache.kafka.common.security.ssl.SSLFactory
-import org.apache.kafka.test.TestSSLUtils
 
 import scala.collection.Map
 import scala.collection.JavaConversions._
@@ -964,9 +964,9 @@ object TestUtils extends Logging {
 
     val sslConfigs = {
       if (mode == Mode.SERVER)
-        TestSSLUtils.createSSLConfig(true, true, mode, trustStore, certAlias)
+        TestSslUtils.createSslConfig(true, true, mode, trustStore, certAlias)
       else
-        TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStore, certAlias)
+        TestSslUtils.createSslConfig(clientCert, false, mode, trustStore, certAlias)
     }
 
     val sslProps = new Properties()


Mime
View raw message