kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: KAFKA-5184 KAFKA-5173; Various improvements to SASL tests
Date Wed, 10 May 2017 21:38:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a420d20c0 -> 0bede30ad


http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
new file mode 100644
index 0000000..19c386f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import java.io.File
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{After, Before, Test}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils
+import TestUtils._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.StringSerializer
+
+class ReplicaFetchTest extends ZooKeeperTestHarness  {
+  var brokers: Seq[KafkaServer] = null
+  val topic1 = "foo"
+  val topic2 = "bar"
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val props = createBrokerConfigs(2, zkConnect)
+    brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
+  }
+
+  @After
+  override def tearDown() {
+    brokers.foreach(_.shutdown())
+    super.tearDown()
+  }
+
+  @Test
+  def testReplicaFetcherThread() {
+    val partition = 0
+    val testMessageList1 = List("test1", "test2", "test3", "test4")
+    val testMessageList2 = List("test5", "test6", "test7", "test8")
+
+    // create a topic and partition and await leadership
+    for (topic <- List(topic1,topic2)) {
+      createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 2, servers = brokers)
+    }
+
+    // send test messages to leader
+    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers),
+                                               retries = 5,
+                                               keySerializer = new StringSerializer,
+                                               valueSerializer = new StringSerializer)
+    val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++
+      testMessageList2.map(m => new ProducerRecord(topic2, m, m))
+    records.map(producer.send).foreach(_.get)
+    producer.close()
+
+    def logsMatch(): Boolean = {
+      var result = true
+      for (topic <- List(topic1, topic2)) {
+        val tp = new TopicPartition(topic, partition)
+        val expectedOffset = brokers.head.getLogManager().getLog(tp).get.logEndOffset
+        result = result && expectedOffset > 0 && brokers.forall { item
=>
+          expectedOffset == item.getLogManager().getLog(tp).get.logEndOffset
+        }
+      }
+      result
+    }
+    waitUntilTrue(logsMatch _, "Broker logs should be identical")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 92a518d..1ee4ac8 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -24,19 +24,31 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.apache.kafka.common.requests.SaslHandshakeRequest
 import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.junit.Test
+import org.junit.{After, Before, Test}
 import org.junit.Assert._
-import kafka.api.SaslTestHarness
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.utils.JaasTestUtils
 
-class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
+class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-  override protected val kafkaClientSaslMechanism = "PLAIN"
-  override protected val kafkaServerSaslMechanisms = List("PLAIN")
-  override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms,
kafkaClientSaslMechanism))
-  override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  override protected val zkSaslEnabled = false
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List("PLAIN")
+  protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms,
kafkaClientSaslMechanism))
+  protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
   override def numBrokers = 1
 
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl,
JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
+
   @Test
   def testApiVersionsRequestBeforeSaslHandshakeRequest() {
     val plaintextSocket = connect(protocol = securityProtocol)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
deleted file mode 100644
index 435a0f0..0000000
--- a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import kafka.api.SaslTestHarness
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SaslPlaintextReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness {
-  override protected val zkSaslEnabled = false
-  protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-  protected def trustStoreFile = None
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
deleted file mode 100644
index 3af8485..0000000
--- a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.io.File
-
-import kafka.api.SaslTestHarness
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SaslSslReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness {
-  override protected val zkSaslEnabled = false
-  protected def securityProtocol = SecurityProtocol.SASL_SSL
-  protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
deleted file mode 100644
index dad2285..0000000
--- a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.io.File
-
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SslReplicaFetchTest extends BaseReplicaFetchTest {
-  protected def securityProtocol = SecurityProtocol.SSL
-  protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 3ae680c..d10e861 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -25,68 +25,67 @@ object JaasTestUtils {
                              keyTab: String,
                              principal: String,
                              debug: Boolean,
-                             serviceName: Option[String]) {
-    def toJaasModule: JaasModule = {
-      JaasModule(
-        "com.sun.security.auth.module.Krb5LoginModule",
-        debug = debug,
-        entries = Map(
-          "useKeyTab" -> useKeyTab.toString,
-          "storeKey" -> storeKey.toString,
-          "keyTab" -> keyTab,
-          "principal" -> principal
-        ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty)
-      )
-    }
+                             serviceName: Option[String]) extends JaasModule {
+
+    def name = "com.sun.security.auth.module.Krb5LoginModule"
+
+    def entries: Map[String, String] = Map(
+      "useKeyTab" -> useKeyTab.toString,
+      "storeKey" -> storeKey.toString,
+      "keyTab" -> keyTab,
+      "principal" -> principal
+    ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty)
+
   }
 
   case class PlainLoginModule(username: String,
                               password: String,
                               debug: Boolean = false,
-                              validUsers: Map[String, String] = Map.empty) {
-    def toJaasModule: JaasModule = {
-      JaasModule(
-        "org.apache.kafka.common.security.plain.PlainLoginModule",
-        debug = debug,
-        entries = Map(
-          "username" -> username,
-          "password" -> password
-        ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass }
-      )
-    }
+                              validUsers: Map[String, String] = Map.empty) extends JaasModule
{
+
+    def name = "org.apache.kafka.common.security.plain.PlainLoginModule"
+
+    def entries: Map[String, String] = Map(
+      "username" -> username,
+      "password" -> password
+    ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass }
+
+  }
+
+  case class ZkDigestModule(debug: Boolean = false,
+                            entries: Map[String, String] = Map.empty) extends JaasModule
{
+    def name = "org.apache.zookeeper.server.auth.DigestLoginModule"
   }
 
   case class ScramLoginModule(username: String,
                               password: String,
-                              debug: Boolean = false) {
-    def toJaasModule: JaasModule = {
-      JaasModule(
-        "org.apache.kafka.common.security.scram.ScramLoginModule",
-        debug = debug,
-        entries = Map(
-          "username" -> username,
-          "password" -> password
-        )
-      )
-    }
+                              debug: Boolean = false) extends JaasModule {
+
+    def name = "org.apache.kafka.common.security.scram.ScramLoginModule"
+
+    def entries: Map[String, String] = Map(
+      "username" -> username,
+      "password" -> password
+    )
   }
 
-  case class JaasModule(moduleName: String,
-                        debug: Boolean,
-                        entries: Map[String, String]) {
+  sealed trait JaasModule {
+    def name: String
+    def debug: Boolean
+    def entries: Map[String, String]
+
     override def toString: String = {
-      s"""$moduleName required
+      s"""$name required
           |  debug=$debug
           |  ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n|  ", ";")}
           |""".stripMargin
     }
   }
 
-  case class JaasSection(contextName: String,
-                    jaasModule: Seq[JaasModule]) {
+  case class JaasSection(contextName: String, modules: Seq[JaasModule]) {
     override def toString: String = {
       s"""|$contextName {
-          |  ${jaasModule.mkString("\n  ")}
+          |  ${modules.mkString("\n  ")}
           |};
           |""".stripMargin
     }
@@ -97,7 +96,6 @@ object JaasTestUtils {
   private val ZkUserSuperPasswd = "adminpasswd"
   private val ZkUser = "fpj"
   private val ZkUserPassword = "fpjsecret"
-  private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
 
   val KafkaServerContextName = "KafkaServer"
   val KafkaServerPrincipalUnqualifiedName = "kafka"
@@ -122,10 +120,10 @@ object JaasTestUtils {
   val KafkaScramAdmin = "scram-admin"
   val KafkaScramAdminPassword = "scram-admin-secret"
 
-  def writeJaasContextsToFile(jaasContexts: Seq[JaasSection]): String = {
+  def writeJaasContextsToFile(jaasSections: Seq[JaasSection]): File = {
     val jaasFile = TestUtils.tempFile()
-    writeToFile(jaasFile,jaasContexts)
-    jaasFile.getCanonicalPath
+    writeToFile(jaasFile, jaasSections)
+    jaasFile
   }
 
   // Returns the dynamic configuration, using credentials for user #1
@@ -133,11 +131,13 @@ object JaasTestUtils {
     kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword,
KafkaScramUser, KafkaScramPassword).toString
 
   def zkSections: Seq[JaasSection] = Seq(
-    new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, Map("user_super"
-> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
-    new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" ->
ZkUser, "password" -> ZkUserPassword))))
+    new JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false,
+      Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
+    new JaasSection(ZkClientContextName, Seq(ZkDigestModule(debug = false,
+      Map("username" -> ZkUser, "password" -> ZkUserPassword))))
   )
 
-  def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]):
JaasSection = {
+  def kafkaServerSection(contextName: String, mechanisms: Seq[String], keytabLocation: Option[File]):
JaasSection = {
     val modules = mechanisms.map {
       case "GSSAPI" =>
         Krb5LoginModule(
@@ -146,18 +146,22 @@ object JaasTestUtils {
           keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location
not specified for GSSAPI")).getAbsolutePath,
           principal = KafkaServerPrincipal,
           debug = true,
-          serviceName = Some("kafka")).toJaasModule
+          serviceName = Some("kafka"))
       case "PLAIN" =>
         PlainLoginModule(
           KafkaPlainAdmin,
           KafkaPlainAdminPassword,
           debug = false,
-          Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> KafkaPlainPassword,
KafkaPlainUser2 -> KafkaPlainPassword2)).toJaasModule
+          Map(
+            KafkaPlainAdmin -> KafkaPlainAdminPassword,
+            KafkaPlainUser -> KafkaPlainPassword,
+            KafkaPlainUser2 -> KafkaPlainPassword2
+          ))
       case "SCRAM-SHA-256" | "SCRAM-SHA-512" =>
         ScramLoginModule(
           KafkaScramAdmin,
           KafkaScramAdminPassword,
-          debug = false).toJaasModule
+          debug = false)
       case mechanism => throw new IllegalArgumentException("Unsupported server mechanism
" + mechanism)
     }
     new JaasSection(contextName, modules)
@@ -177,17 +181,17 @@ object JaasTestUtils {
           principal = clientPrincipal,
           debug = true,
           serviceName = Some("kafka")
-        ).toJaasModule
+        )
       case "PLAIN" =>
         PlainLoginModule(
           plainUser,
           plainPassword
-        ).toJaasModule
+        )
       case "SCRAM-SHA-256" | "SCRAM-SHA-512" =>
         ScramLoginModule(
           scramUser,
           scramPassword
-        ).toJaasModule
+        )
       case mechanism => throw new IllegalArgumentException("Unsupported client mechanism
" + mechanism)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 05d9686..01ff83d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -227,7 +227,7 @@ object TestUtils extends Logging {
     if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol)
})
       props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
 
-    if (protocolAndPorts.exists { case (protocol, _) => usesSaslTransportLayer(protocol)
})
+    if (protocolAndPorts.exists { case (protocol, _) => usesSaslAuthentication(protocol)
})
       props.putAll(saslConfigs(saslProperties))
 
     interBrokerSecurityProtocol.foreach { protocol =>
@@ -492,7 +492,7 @@ object TestUtils extends Logging {
     val props = new Properties
     if (usesSslTransportLayer(securityProtocol))
       props.putAll(sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile,
certAlias))
-    if (usesSaslTransportLayer(securityProtocol))
+    if (usesSaslAuthentication(securityProtocol))
       props.putAll(saslConfigs(saslProperties))
     props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
     props
@@ -549,12 +549,12 @@ object TestUtils extends Logging {
     new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
   }
 
-  private def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol
match {
+  def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol
match {
     case SecurityProtocol.SSL | SecurityProtocol.SASL_SSL => true
     case _ => false
   }
 
-  private def usesSaslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol
match {
+  def usesSaslAuthentication(securityProtocol: SecurityProtocol): Boolean = securityProtocol
match {
     case SecurityProtocol.SASL_PLAINTEXT | SecurityProtocol.SASL_SSL => true
     case _ => false
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index c9076b5..75625cd 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -53,13 +53,12 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness
{
   
   @Before
   override def setUp() {
-    if(secure) {
+    if (secure) {
       Configuration.setConfiguration(null)
-      System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
+      System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
       System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
-      if(!JaasUtils.isZkSecurityEnabled()) {
+      if (!JaasUtils.isZkSecurityEnabled)
         fail("Secure access not enabled")
-     }
     }
     super.setUp
   }


Mime
View raw message