kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2456 KAFKA-2472; SSL clean-ups
Date Thu, 22 Oct 2015 00:10:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d9b1dc708 -> 65922b538


KAFKA-2456 KAFKA-2472; SSL clean-ups

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #342 from ijuma/kafka-2472-fix-kafka-ssl-config-warnings


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65922b53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65922b53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65922b53

Branch: refs/heads/trunk
Commit: 65922b5388561e3ab830fd1f367faa289d205e2a
Parents: d9b1dc7
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed Oct 21 17:10:36 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 21 17:10:36 2015 -0700

----------------------------------------------------------------------
 .../kafka/common/config/AbstractConfig.java     | 30 ++++++++++---
 .../apache/kafka/common/config/SSLConfigs.java  | 13 +++---
 .../kafka/common/network/Authenticator.java     |  7 ++--
 .../kafka/common/security/ssl/SSLFactory.java   | 25 +++++------
 .../controller/ControllerChannelManager.scala   |  2 +-
 .../main/scala/kafka/network/SocketServer.scala |  5 +--
 .../main/scala/kafka/server/KafkaConfig.scala   | 44 ++++----------------
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  2 +-
 .../kafka/security/auth/OperationTest.scala     |  2 +-
 .../security/auth/PermissionTypeTest.scala      |  2 +-
 .../kafka/security/auth/ResourceTypeTest.scala  |  2 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |  1 +
 13 files changed, 63 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 1dae61c..327a9ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -100,7 +100,7 @@ public class AbstractConfig {
     }
 
     public Set<String> unused() {
-        Set<String> keys = new HashSet<String>(originals.keySet());
+        Set<String> keys = new HashSet<>(originals.keySet());
         keys.removeAll(used);
         return keys;
     }
@@ -109,12 +109,12 @@ public class AbstractConfig {
         Set<String> unusedKeys = this.unused();
         Properties unusedProps = new Properties();
         for (String key : unusedKeys)
-            unusedProps.put(key, this.originals().get(key));
+            unusedProps.put(key, this.originals.get(key));
         return unusedProps;
     }
 
     public Map<String, Object> originals() {
-        Map<String, Object> copy = new HashMap<String, Object>();
+        Map<String, Object> copy = new RecordingMap<>();
         copy.putAll(originals);
         return copy;
     }
@@ -126,7 +126,7 @@ public class AbstractConfig {
      * @return a Map containing the settings with the prefix
      */
     public Map<String, Object> originalsWithPrefix(String prefix) {
-        Map<String, Object> result = new HashMap<String, Object>();
+        Map<String, Object> result = new RecordingMap<>();
         for (Map.Entry<String, ?> entry : originals.entrySet()) {
             if (entry.getKey().startsWith(prefix) && entry.getKey().length() >
prefix.length())
                 result.put(entry.getKey().substring(prefix.length()), entry.getValue());
@@ -135,7 +135,7 @@ public class AbstractConfig {
     }
 
     public Map<String, ?> values() {
-        return new HashMap<String, Object>(values);
+        return new RecordingMap<>(values);
     }
 
     private void logAll() {
@@ -214,4 +214,24 @@ public class AbstractConfig {
     public int hashCode() {
         return originals.hashCode();
     }
+
+    /**
+     * Marks keys retrieved via `get` as used. This is needed because `Configurable.configure`
takes a `Map` instead
+     * of an `AbstractConfig` and we can't change that without breaking public API like `Partitioner`.
+     */
+    private class RecordingMap<V> extends HashMap<String, V> {
+
+        RecordingMap() {}
+
+        RecordingMap(Map<String, ? extends V> m) {
+            super(m);
+        }
+
+        @Override
+        public V get(Object key) {
+            if (key instanceof String)
+                ignore((String) key);
+            return super.get(key);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
index 0fed961..207a349 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
@@ -26,8 +26,11 @@ public class SSLConfigs {
     public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
 
     public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
-    public static final String SSL_PROTOCOL_DOC = "The ssl protocol used to generate SSLContext."
-            + "Default setting is TLS. Allowed values are SSL, SSLv2, SSLv3, TLS, TLSv1.1,
TLSv1.2";
+    public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the
SSLContext. "
+            + "Default setting is TLS, which is fine for most cases. "
+            + "Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and
SSLv3 "
+            + "may be supported in older JVMs, but their usage is discouraged due to known
security vulnerabilities.";
+
     public static final String DEFAULT_SSL_PROTOCOL = "TLS";
 
     public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
@@ -39,7 +42,7 @@ public class SSLConfigs {
 
     public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
     public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled
for SSL connections. "
-            + "All versions of TLS is enabled by default.";
+            + "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.";
     public static final String DEFAULT_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
 
     public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
@@ -49,11 +52,11 @@ public class SSLConfigs {
 
     public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
     public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store
file. "
-        + "This is optional for Client and can be used for two-way authentication for client.";
+        + "This is optional for client and can be used for two-way authentication for client.";
 
     public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
     public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key
store file."
-        + "This is optional for client and only needed if the ssl.keystore.location configured.
";
+        + "This is optional for client and only needed if ssl.keystore.location is configured.
";
 
     public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
     public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in
the key store file. "

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index 7f6eb8c..6f01fe5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -17,10 +17,6 @@
 
 package org.apache.kafka.common.network;
 
-/**
- * Authentication for Channel
- */
-
 import java.io.IOException;
 import java.util.Map;
 import java.security.Principal;
@@ -28,6 +24,9 @@ import java.security.Principal;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.KafkaException;
 
+/**
+ * Authentication for Channel
+ */
 public interface Authenticator {
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
index 163b8c6..c25993e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
@@ -47,7 +47,6 @@ public class SSLFactory implements Configurable {
     private boolean wantClientAuth;
     private final Mode mode;
 
-
     public SSLFactory(Mode mode) {
         this.mode = mode;
     }
@@ -57,23 +56,21 @@ public class SSLFactory implements Configurable {
         this.protocol =  (String) configs.get(SSLConfigs.SSL_PROTOCOL_CONFIG);
         this.provider = (String) configs.get(SSLConfigs.SSL_PROVIDER_CONFIG);
 
-        if (configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG) != null) {
-            List<String> cipherSuitesList = (List<String>) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG);
-            if (!cipherSuitesList.isEmpty())
-                this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
-        }
 
-        if (configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG) != null) {
-            List<String> enabledProtocolsList = (List<String>) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
+        List<String> cipherSuitesList = (List<String>) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG);
+        if (cipherSuitesList != null)
+            this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
+
+        List<String> enabledProtocolsList = (List<String>) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
+        if (enabledProtocolsList != null)
             this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
-        }
 
-        if (configs.containsKey(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG))
{
-            this.endpointIdentification = (String) configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
-        }
+        String endpointIdentification = (String) configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+        if (endpointIdentification != null)
+            this.endpointIdentification = endpointIdentification;
 
-        if (configs.containsKey(SSLConfigs.SSL_CLIENT_AUTH_CONFIG)) {
-            String clientAuthConfig = (String) configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG);
+        String clientAuthConfig = (String) configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG);
+        if (clientAuthConfig != null) {
             if (clientAuthConfig.equals("required"))
                 this.needClientAuth = true;
             else if (clientAuthConfig.equals("requested"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index d86c8ce..6b24c29 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -96,7 +96,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config:
Kaf
         "controller-channel",
         Map("broker-id" -> broker.id.toString).asJava,
         false,
-        ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER,
config.channelConfigs)
+        ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER,
config.values)
       )
       new NetworkClient(
         selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 1066fbe..1b94c86 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -81,7 +81,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time:
Time
 
       connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
 
-      val channelConfigs = config.channelConfigs
       val sendBufferSize = config.socketSendBufferBytes
       val recvBufferSize = config.socketReceiveBufferBytes
       val maxRequestSize = config.socketRequestMaxBytes
@@ -101,7 +100,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
             connectionQuotas,
             connectionsMaxIdleMs,
             protocol,
-            channelConfigs,
+            config.values,
             metrics
           )
         }
@@ -357,7 +356,7 @@ private[kafka] class Processor(val id: Int,
                                connectionQuotas: ConnectionQuotas,
                                connectionsMaxIdleMs: Long,
                                protocol: SecurityProtocol,
-                               channelConfigs: java.util.Map[String, Object],
+                               channelConfigs: java.util.Map[String, _],
                                metrics: Metrics) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
 
   private object ConnectionId {

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5b311e2..52182b8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -166,7 +166,6 @@ object Defaults {
   val SSLClientAuthRequested = "requested"
   val SSLClientAuthNone = "none"
   val SSLClientAuth = SSLClientAuthNone
-  val SSLCipherSuites = ""
 
   /** ********* Sasl configuration ***********/
   val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD
@@ -641,7 +640,7 @@ object KafkaConfig {
 
 
       /** ********* SSL Configuration ****************/
-      .define(PrincipalBuilderClassProp, STRING, Defaults.PrincipalBuilderClass, MEDIUM,
PrincipalBuilderClassDoc)
+      .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
       .define(SSLProtocolProp, STRING, Defaults.SSLProtocol, MEDIUM, SSLProtocolDoc)
       .define(SSLProviderProp, STRING, MEDIUM, SSLProviderDoc, false)
       .define(SSLEnabledProtocolsProp, LIST, Defaults.SSLEnabledProtocols, MEDIUM, SSLEnabledProtocolsDoc)
@@ -654,8 +653,9 @@ object KafkaConfig {
       .define(SSLTruststorePasswordProp, STRING, MEDIUM, SSLTruststorePasswordDoc, false)
       .define(SSLKeyManagerAlgorithmProp, STRING, Defaults.SSLKeyManagerAlgorithm, MEDIUM,
SSLKeyManagerAlgorithmDoc)
       .define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM,
SSLTrustManagerAlgorithmDoc)
+      .define(SSLEndpointIdentificationAlgorithmProp, STRING, LOW, SSLEndpointIdentificationAlgorithmDoc,
false)
       .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired,
Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc)
-      .define(SSLCipherSuitesProp, LIST, Defaults.SSLCipherSuites, MEDIUM, SSLCipherSuitesDoc)
+      .define(SSLCipherSuitesProp, LIST, MEDIUM, SSLCipherSuitesDoc, false)
 
       /** ********* Sasl Configuration ****************/
       .define(SaslKerberosServiceNameProp, STRING, MEDIUM, SaslKerberosServiceNameDoc, false)
@@ -699,8 +699,8 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   /** ********* Zookeeper Configuration ***********/
   val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
   val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp)
-  val zkConnectionTimeoutMs: java.lang.Integer =
-    Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
+  val zkConnectionTimeoutMs: Int =
+    Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
   val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
 
   /** ********* General Configuration ***********/
@@ -809,7 +809,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,
classOf[MetricsReporter])
 
   /** ********* SSL Configuration **************/
-  val principalBuilderClass = getString(KafkaConfig.PrincipalBuilderClassProp)
+  val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
   val sslProtocol = getString(KafkaConfig.SSLProtocolProp)
   val sslProvider = getString(KafkaConfig.SSLProviderProp)
   val sslEnabledProtocols = getList(KafkaConfig.SSLEnabledProtocolsProp)
@@ -912,7 +912,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
 
   private def getMetricClasses(metricClasses: java.util.List[String]): java.util.List[MetricsReporter]
= {
 
-    val reporterList = new util.ArrayList[MetricsReporter]();
+    val reporterList = new util.ArrayList[MetricsReporter]()
     val iterator = metricClasses.iterator()
 
     while (iterator.hasNext) {
@@ -953,34 +953,4 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
   }
 
-  def channelConfigs: java.util.Map[String, Object] = {
-    val channelConfigs = new java.util.HashMap[String, Object]()
-    import kafka.server.KafkaConfig._
-    Seq(
-      (PrincipalBuilderClassProp, Class.forName(principalBuilderClass)),
-      (SSLProtocolProp, sslProtocol),
-      (SSLEnabledProtocolsProp, sslEnabledProtocols),
-      (SSLKeystoreTypeProp, sslKeystoreType),
-      (SSLKeystoreLocationProp, sslKeystoreLocation),
-      (SSLKeystorePasswordProp, sslKeystorePassword),
-      (SSLKeyPasswordProp, sslKeyPassword),
-      (SSLTruststoreTypeProp, sslTruststoreType),
-      (SSLTruststoreLocationProp, sslTruststoreLocation),
-      (SSLTruststorePasswordProp, sslTruststorePassword),
-      (SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm),
-      (SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm),
-      (SSLClientAuthProp, sslClientAuth),
-      (SSLCipherSuitesProp, sslCipher),
-      (SaslKerberosServiceNameProp, saslKerberosServiceName),
-      (SaslKerberosKinitCmdProp, saslKerberosKinitCmd),
-      (SaslKerberosTicketRenewWindowFactorProp, saslKerberosTicketRenewWindowFactor),
-      (SaslKerberosTicketRenewJitterProp, saslKerberosTicketRenewJitter),
-      (SaslKerberosMinTimeBeforeReloginProp, saslKerberosMinTimeBeforeRelogin),
-      (AuthToLocalProp, authToLocal)
-    ).foreach { case (key, value) =>
-      if (value != null) channelConfigs.put(key, value)
-    }
-    channelConfigs
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d2a1e61..617f807 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -315,7 +315,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           "kafka-server-controlled-shutdown",
           Map.empty.asJava,
           false,
-          ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER,
config.channelConfigs)
+          ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER,
config.values)
         )
         new NetworkClient(
           selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 4affd89..06be5c2 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -74,7 +74,7 @@ class ReplicaFetcherThread(name: String,
       "replica-fetcher",
       Map("broker-id" -> sourceBroker.id.toString).asJava,
       false,
-      ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER,
brokerConfig.channelConfigs)
+      ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER,
brokerConfig.values)
     )
     new NetworkClient(
       selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
index 2f15f9f..371de38 100644
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
@@ -32,7 +32,7 @@ class OperationTest extends JUnitSuite {
       Operation.fromString("badName")
       fail("Expected exception on invalid operation name.")
     } catch {
-      case e: KafkaException => "Expected."
+      case e: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
index 0518985..46ac87e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
@@ -32,7 +32,7 @@ class PermissionTypeTest extends JUnitSuite {
       PermissionType.fromString("badName")
       fail("Expected exception on invalid PermissionType name.")
     } catch {
-      case e: KafkaException => "Expected."
+      case e: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
index a632d37..938b201 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
@@ -32,7 +32,7 @@ class ResourceTypeTest extends JUnitSuite {
       ResourceType.fromString("badName")
       fail("Expected exception on invalid ResourceType name.")
     } catch {
-      case e: KafkaException => "Expected."
+      case e: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65922b53/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 3e277fa..b1a7f21 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -503,6 +503,7 @@ class KafkaConfigTest {
         case KafkaConfig.SSLKeyManagerAlgorithmProp =>
         case KafkaConfig.SSLTrustManagerAlgorithmProp =>
         case KafkaConfig.SSLClientAuthProp => // ignore string
+        case KafkaConfig.SSLEndpointIdentificationAlgorithmProp => // ignore string
         case KafkaConfig.SSLCipherSuitesProp => // ignore string
 
         //Sasl Configs


Mime
View raw message