kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3194: Validate security.inter.broker.protocol against the adver…
Date Wed, 03 Feb 2016 23:45:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 d0e864795 -> 29ebb42fd


KAFKA-3194: Validate security.inter.broker.protocol against the adver…

…tised.listeners protocols

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Ismael Juma

Closes #851 from granthenke/verify-protocol


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29ebb42f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29ebb42f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29ebb42f

Branch: refs/heads/0.9.0
Commit: 29ebb42fdf8af4b5983f3b22f13697e0c73e0d5a
Parents: d0e8647
Author: Grant Henke <granthenke@gmail.com>
Authored: Wed Feb 3 15:42:22 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Wed Feb 3 15:45:26 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   |  7 +++++
 .../unit/kafka/network/SocketServerTest.scala   |  4 +--
 .../unit/kafka/server/KafkaConfigTest.scala     | 28 ++++++++++++++++++++
 3 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/29ebb42f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5cc10b4..f7af38c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -953,6 +953,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
       "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType
+ " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
+    require(advertisedListeners.keySet.contains(interBrokerSecurityProtocol),
+      s"${KafkaConfig.InterBrokerSecurityProtocolProp} must be a protocol in the configured
set of ${KafkaConfig.AdvertisedListenersProp}. " +
+      s"The valid options based on currently configured protocols are ${advertisedListeners.keySet}")
+    require(advertisedListeners.keySet.subsetOf(listeners.keySet),
+      s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of
${KafkaConfig.ListenersProp} protocols. " +
+      s"Found ${advertisedListeners.keySet}. The valid options based on currently configured
protocols are ${listeners.keySet}"
+    )
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/29ebb42f/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 01f198e..606d199 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -221,9 +221,9 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testSslSocketServer(): Unit = {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
-    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl
= true,
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol
= Some(SecurityProtocol.SSL),
       trustStoreFile = Some(trustStoreFile))
-    overrideProps.put("listeners", "SSL://localhost:0")
+    overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
 
     val serverMetrics = new Metrics
     val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps),
serverMetrics, new SystemTime)

http://git-wip-us.apache.org/repos/asf/kafka/blob/29ebb42f/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9ddc2c1..ac4d6ca 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -383,6 +383,34 @@ class KafkaConfigTest {
   }
 
   @Test
+  def testInvalidInterBrokerSecurityProtocol() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    props.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
+    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
+    intercept[IllegalArgumentException] {
+      KafkaConfig.fromProps(props)
+    }
+  }
+
+  @Test
+  def testEqualAdvertisedListenersProtocol() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testInvalidAdvertisedListenersProtocol() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
+    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+    intercept[IllegalArgumentException] {
+      KafkaConfig.fromProps(props)
+    }
+  }
+
+  @Test
   def testFromPropsInvalid() {
     def getBaseProperties(): Properties = {
       val validRequiredProperties = new Properties()


Mime
View raw message