kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/4] kafka git commit: KAFKA-1686; Implement SASL/Kerberos
Date Tue, 20 Oct 2015 21:13:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8f32617e6 -> 403158b54


http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
deleted file mode 100644
index 5dc4cbc..0000000
--- a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-package kafka.api
-
-import java.util.Properties
-import java.io.File
-
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.TopicPartition
-import kafka.integration.KafkaServerTestHarness
-
-import kafka.utils.{TestUtils, Logging}
-import kafka.server.KafkaConfig
-
-import java.util.ArrayList
-import org.junit.{Test, Before, After}
-import org.junit.Assert._
-
-import scala.collection.mutable.Buffer
-import scala.collection.JavaConversions._
-import kafka.coordinator.ConsumerCoordinator
-
-
-/**
-  * Integration tests for the new consumer that cover basic usage as well as server failures
-  */
-class SSLConsumerTest extends KafkaServerTestHarness with Logging {
-
-  val trustStoreFile = File.createTempFile("truststore", ".jks")
-  val numServers = 3
-  val producerCount = 1
-  val consumerCount = 2
-  val producerConfig = new Properties
-  val consumerConfig = new Properties
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
-  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
-  overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
-  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  overridingProps.put(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout
-
-  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
-
-  def generateConfigs() =
-    TestUtils.createBrokerConfigs(numServers, zkConnect, false, enableSSL=true, trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, overridingProps))
-
-  val topic = "topic"
-  val part = 0
-  val tp = new TopicPartition(topic, part)
-
-  // configure the servers and clients
-  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
-  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
-  this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
-  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getSSLBrokerListStrFromServers(servers))
-    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getSSLBrokerListStrFromServers(servers))
-    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range")
-
-    for (i <- 0 until producerCount)
-      producers += TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers),
-                                               acks = 1,
-                                               enableSSL=true,
-                                               trustStoreFile=Some(trustStoreFile))
-    for (i <- 0 until consumerCount)
-      consumers += TestUtils.createNewConsumer(TestUtils.getSSLBrokerListStrFromServers(servers),
-                                               groupId = "my-test",
-                                               partitionAssignmentStrategy= "range",
-                                               enableSSL=true,
-                                               trustStoreFile=Some(trustStoreFile))
-
-
-    // create the consumer offset topic
-    TestUtils.createTopic(zkUtils, ConsumerCoordinator.OffsetsTopicName,
-      overridingProps.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
-      overridingProps.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      servers(0).consumerCoordinator.offsetsTopicConfigs)
-
-    // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(zkUtils, topic, 1, numServers, this.servers)
-  }
-
-  @After
-  override def tearDown() {
-    producers.foreach(_.close())
-    consumers.foreach(_.close())
-    super.tearDown()
-  }
-
-  @Test
-  def testSimpleConsumption() {
-    val numRecords = 10000
-    sendRecords(numRecords)
-    assertEquals(0, this.consumers(0).assignment.size)
-    this.consumers(0).assign(List(tp))
-    assertEquals(1, this.consumers(0).assignment.size)
-    this.consumers(0).seek(tp, 0)
-    consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
-  }
-
-  @Test
-  def testAutoOffsetReset() {
-    sendRecords(1)
-    this.consumers(0).assign(List(tp))
-    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
-  }
-
-  @Test
-  def testSeek() {
-    val consumer = this.consumers(0)
-    val totalRecords = 50L
-    sendRecords(totalRecords.toInt)
-    consumer.assign(List(tp))
-
-    consumer.seekToEnd(tp)
-    assertEquals(totalRecords, consumer.position(tp))
-    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
-
-    consumer.seekToBeginning(tp)
-    assertEquals(0, consumer.position(tp), 0)
-    consumeRecords(consumer, numRecords = 1, startingOffset = 0)
-
-    val mid = totalRecords / 2
-    consumer.seek(tp, mid)
-    assertEquals(mid, consumer.position(tp))
-    consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
-  }
-
-  @Test
-  def testGroupConsumption() {
-    sendRecords(10)
-    this.consumers(0).subscribe(List(topic))
-    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
-  }
-
-  @Test
-  def testPositionAndCommit() {
-    sendRecords(5)
-
-    // committed() on a partition with no committed offset returns null
-    assertNull(this.consumers(0).committed(new TopicPartition(topic, 15)))
-
-    // position() on a partition that we aren't subscribed to throws an exception
-    intercept[IllegalArgumentException] {
-      this.consumers(0).position(new TopicPartition(topic, 15))
-    }
-
-    this.consumers(0).assign(List(tp))
-
-    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
-    this.consumers(0).commitSync()
-    assertEquals(0L, this.consumers(0).committed(tp).offset)
-
-    consumeRecords(this.consumers(0), 5, 0)
-    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
-    this.consumers(0).commitSync()
-    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
-
-    sendRecords(1)
-
-    // another consumer in the same group should get the same position
-    this.consumers(1).assign(List(tp))
-    consumeRecords(this.consumers(1), 1, 5)
-  }
-
-  @Test
-  def testPartitionsFor() {
-    val numParts = 2
-    TestUtils.createTopic(zkUtils, "part-test", numParts, 1, this.servers)
-    val parts = this.consumers(0).partitionsFor("part-test")
-    assertNotNull(parts)
-    assertEquals(2, parts.length)
-    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
-  }
-
-  private def sendRecords(numRecords: Int) {
-    val futures = (0 until numRecords).map { i =>
-      this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
-    }
-    futures.map(_.get)
-  }
-
-  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) {
-    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
-    val maxIters = numRecords * 300
-    var iters = 0
-    while (records.size < numRecords) {
-      for (record <- consumer.poll(50)) {
-        records.add(record)
-      }
-      if (iters > maxIters)
-        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
-      iters += 1
-    }
-    for (i <- 0 until numRecords) {
-      val record = records.get(i)
-      val offset = startingOffset + i
-      assertEquals(topic, record.topic())
-      assertEquals(part, record.partition())
-      assertEquals(offset.toLong, record.offset())
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
deleted file mode 100644
index c22e57a..0000000
--- a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.api
-
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-import java.io.File
-
-import kafka.consumer.SimpleConsumer
-import kafka.integration.KafkaServerTestHarness
-import kafka.message.Message
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.errors.SerializationException
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-
-
-class SSLProducerSendTest extends KafkaServerTestHarness {
-  val numServers = 2
-  val trustStoreFile = File.createTempFile("truststore", ".jks")
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
-
-  def generateConfigs() =
-    TestUtils.createBrokerConfigs(numServers, zkConnect, false, enableSSL=true, trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, overridingProps))
-
-  private var consumer1: SimpleConsumer = null
-  private var consumer2: SimpleConsumer = null
-
-  private val topic = "topic"
-  private val numRecords = 100
-
-  @Before
-  override def setUp() {
-    super.setUp()
-
-    // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "")
-    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
-
-  }
-
-  @After
-  override def tearDown() {
-    consumer1.close()
-    consumer2.close()
-    super.tearDown()
-  }
-
-  /**
-    * testSendOffset checks the basic send API behavior
-    *
-    * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
-    * 2. Last message of the non-blocking send should return the correct offset metadata
-    */
-  @Test
-  def testSendOffset() {
-    var sslProducer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile))
-    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers))
-    val partition = new Integer(0)
-
-    object callback extends Callback {
-      var offset = 0L
-      def onCompletion(metadata: RecordMetadata, exception: Exception) {
-        if (exception == null) {
-          assertEquals(offset, metadata.offset())
-          assertEquals(topic, metadata.topic())
-          assertEquals(partition, metadata.partition())
-          offset += 1
-        } else {
-          fail("Send callback returns the following exception", exception)
-        }
-      }
-    }
-
-    try {
-      // create topic
-      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
-
-      // send a normal record
-      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
-      assertEquals("Should have offset 0", 0L, sslProducer.send(record0, callback).get.offset)
-
-      // send a record with null value should be ok
-      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null)
-      assertEquals("Should have offset 1", 1L, sslProducer.send(record1, callback).get.offset)
-
-      // send a record with null key should be ok
-      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes)
-      assertEquals("Should have offset 2", 2L, sslProducer.send(record2, callback).get.offset)
-
-      // send a record with null part id should be ok
-      val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
-      assertEquals("Should have offset 3", 3L, sslProducer.send(record3, callback).get.offset)
-
-      // send a record with null topic should fail
-      try {
-        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes)
-        sslProducer.send(record4, callback)
-        fail("Should not allow sending a record without topic")
-      } catch {
-        case iae: IllegalArgumentException => // this is ok
-        case e: Throwable => fail("Only expecting IllegalArgumentException", e)
-      }
-
-      // non-blocking send a list of records with sslProducer
-      for (i <- 1 to numRecords)
-        sslProducer.send(record0, callback)
-      // check that all messages have been acked via offset
-      assertEquals("Should have offset " + numRecords + 4L, numRecords + 4L, sslProducer.send(record0, callback).get.offset)
-
-      //non-blocking send a list of records with plaintext producer
-      for (i <- 1 to numRecords)
-        producer.send(record0, callback)
-
-      // check that all messages have been acked via offset
-      assertEquals("Should have offset " + (numRecords * 2 + 5L), numRecords * 2 + 5L, producer.send(record0, callback).get.offset)
-
-    } finally {
-      if (sslProducer != null) {
-        sslProducer.close()
-        sslProducer = null
-      }
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
-
-    }
-  }
-
-  /**
-    * testClose checks the closing behavior
-    *
-    * After close() returns, all messages should be sent with correct returned offset metadata
-    */
-  @Test
-  def testClose() {
-    var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile))
-    try {
-      // create topic
-      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
-
-      // non-blocking send a list of records
-      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
-      for (i <- 1 to numRecords)
-        producer.send(record0)
-      val response0 = producer.send(record0)
-
-      // close the producer
-      producer.close()
-      producer = null
-
-      // check that all messages have been acked via offset,
-      // this also checks that messages with same key go to the same partition
-      assertTrue("The last message should be acked before producer is shutdown", response0.isDone)
-      assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset)
-
-    } finally {
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
-    }
-  }
-
-  /**
-    * testSendToPartition checks the partitioning behavior
-    *
-    * The specified partition-id should be respected
-    */
-  @Test
-  def testSendToPartition() {
-    var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile))
-    try {
-      // create topic
-      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
-      val partition = 1
-
-      // make sure leaders exist
-      val leader1 = leaders(partition)
-      assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined)
-
-      val responses =
-        for (i <- 1 to numRecords)
-        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes))
-      val futures = responses.toList
-      futures.map(_.get)
-      for (future <- futures)
-        assertTrue("Request should have completed", future.isDone)
-
-      // make sure all of them end up in the same partition with increasing offset values
-      for ((future, offset) <- futures zip (0 until numRecords)) {
-        assertEquals(offset.toLong, future.get.offset)
-        assertEquals(topic, future.get.topic)
-        assertEquals(partition, future.get.partition)
-      }
-
-      // make sure the fetched messages also respect the partitioning and ordering
-      val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
-        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
-      } else {
-        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
-      }
-      val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer
-      assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
-
-      // TODO: also check topic and partition after they are added in the return messageSet
-      for (i <- 0 to numRecords - 1) {
-        assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
-        assertEquals(i.toLong, messageSet1(i).offset)
-      }
-    } finally {
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
new file mode 100644
index 0000000..e6f0c2b
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
@@ -0,0 +1,19 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness {
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
new file mode 100644
index 0000000..4f8512a
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -0,0 +1,22 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SaslSslConsumerTest extends BaseConsumerTest with SaslTestHarness {
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
new file mode 100644
index 0000000..9575fda
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
@@ -0,0 +1,63 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.{BufferedReader, FileWriter, BufferedWriter, File}
+import javax.security.auth.login.Configuration
+
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.hadoop.minikdc.MiniKdc
+import org.apache.kafka.common.security.JaasUtils
+import org.junit.{After, Before}
+
+trait SaslTestHarness extends ZooKeeperTestHarness {
+  val workDir = new File(System.getProperty("test.dir", "target"))
+  val kdcConf = MiniKdc.createConf()
+  val kdc = new MiniKdc(kdcConf, workDir)
+
+  @Before
+  override def setUp() {
+    // Clean-up global configuration set by other tests
+    Configuration.setConfiguration(null)
+    val keytabFile = TestUtils.tempFile()
+    val jaasFile = TestUtils.tempFile()
+
+    val writer = new BufferedWriter(new FileWriter(jaasFile))
+    val source = io.Source.fromInputStream(
+      Thread.currentThread().getContextClassLoader.getResourceAsStream("kafka_jaas.conf"), "UTF-8")
+    if (source == null)
+      throw new IllegalStateException("Could not load `kaas_jaas.conf`, make sure it is in the classpath")
+
+    for (line <- source.getLines) {
+      val replaced = line.replaceAll("\\$keytab-location", keytabFile.getAbsolutePath)
+      writer.write(replaced)
+      writer.newLine()
+    }
+    writer.close()
+    source.close()
+
+    kdc.start()
+    kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
+    super.setUp
+  }
+
+  @After
+  override def tearDown() {
+    super.tearDown
+    kdc.stop()
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    Configuration.setConfiguration(null)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
new file mode 100644
index 0000000..1d13d88
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
@@ -0,0 +1,22 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SslConsumerTest extends BaseConsumerTest {
+  override protected def securityProtocol = SecurityProtocol.SSL
+  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SslProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SslProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/SslProducerSendTest.scala
new file mode 100644
index 0000000..4d9189c
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SslProducerSendTest.scala
@@ -0,0 +1,27 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import java.io.File
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SslProducerSendTest extends BaseProducerSendTest {
+  override protected def securityProtocol = SecurityProtocol.SSL
+  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 3cf4dae..05dc0bc 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -39,17 +39,23 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
   var adHocConfigs: Seq[KafkaConfig] = null
   val numConfigs: Int = 4
 
-  /* If this is `Some`, SSL will be enabled */
+  // This should be defined if `securityProtocol` uses SSL (eg SSL, SASL_SSL)
   protected def trustStoreFile: Option[File]
+  protected def securityProtocol: SecurityProtocol
 
   @Before
   override def setUp() {
     super.setUp()
-    val props = createBrokerConfigs(numConfigs, zkConnect, enableSSL = trustStoreFile.isDefined, trustStoreFile = trustStoreFile)
+    val props = createBrokerConfigs(numConfigs, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile)
     val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
     adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases
     server1 = TestUtils.createServer(configs.head)
-    brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+    brokerEndPoints = Seq(
+      // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
+      // `securityProtocol` instead of PLAINTEXT below
+      new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, server1.boundPort(SecurityProtocol.PLAINTEXT))
+    )
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index bca0dcc..26b86f7 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -17,12 +17,14 @@
 
 package kafka.integration
 
+import java.io.File
 import java.util.Arrays
 
 import kafka.common.KafkaException
 import kafka.server._
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.{After, Before}
 
 import scala.collection.mutable.Buffer
@@ -52,13 +54,16 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
 
   def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
 
+  protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+  protected def trustStoreFile: Option[File] = None
+
   @Before
   override def setUp() {
     super.setUp
     if(configs.size <= 0)
       throw new KafkaException("Must supply at least one server config.")
     servers = configs.map(TestUtils.createServer(_)).toBuffer
-    brokerList = TestUtils.getBrokerListStrFromServers(servers)
+    brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
     alive = new Array[Boolean](servers.length)
     Arrays.fill(alive, true)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
index 176d251..55c12b5 100644
--- a/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
@@ -17,7 +17,10 @@
 
 package kafka.integration
 
+import org.apache.kafka.common.protocol.SecurityProtocol
+
 class PlaintextTopicMetadataTest extends BaseTopicMetadataTest {
+  protected def securityProtocol = SecurityProtocol.PLAINTEXT
   protected def trustStoreFile = None
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
new file mode 100644
index 0000000..11d6da4
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
@@ -0,0 +1,26 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.integration
+
+import kafka.api.SaslTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SaslPlaintextTopicMetadataTest extends BaseTopicMetadataTest with SaslTestHarness {
+  protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  protected def trustStoreFile = None
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
new file mode 100644
index 0000000..ea15419
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
@@ -0,0 +1,28 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.integration
+
+import java.io.File
+
+import kafka.api.SaslTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SaslSslTopicMetadataTest extends BaseTopicMetadataTest with SaslTestHarness {
+  protected def securityProtocol = SecurityProtocol.SASL_SSL
+  protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
index 5ff9f35..ee73457 100644
--- a/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
@@ -19,6 +19,9 @@ package kafka.integration
 
 import java.io.File
 
+import org.apache.kafka.common.protocol.SecurityProtocol
+
 class SslTopicMetadataTest extends BaseTopicMetadataTest {
-  protected def trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+  protected def securityProtocol = SecurityProtocol.SSL
+  protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 6f07a7a..b0cb97e 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -221,7 +221,8 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testSSLSocketServer(): Unit = {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
-    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0, enableSSL = true, trustStoreFile = Some(trustStoreFile))
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl = true,
+      trustStoreFile = Some(trustStoreFile))
     overrideProps.put("listeners", "SSL://localhost:0")
 
     val serverMetrics = new Metrics

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index ade110d..5ecc2c0 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.io.File
 
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.{Test, After, Before}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
@@ -32,15 +33,16 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
   val topic1 = "foo"
   val topic2 = "bar"
 
-  /* If this is `Some`, SSL will be enabled */
+  // This should be defined if `securityProtocol` uses SSL (eg SSL, SASL_SSL)
   protected def trustStoreFile: Option[File]
+  protected def securityProtocol: SecurityProtocol
 
   @Before
   override def setUp() {
     super.setUp()
-    brokers = createBrokerConfigs(2, zkConnect, enableControlledShutdown = false, enableSSL = trustStoreFile.isDefined, trustStoreFile = trustStoreFile)
-      .map(KafkaConfig.fromProps)
-      .map(TestUtils.createServer(_))
+    val props = createBrokerConfigs(2, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile)
+    brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 42c1199..c9f2540 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -507,6 +507,14 @@ class KafkaConfigTest {
         case KafkaConfig.SSLClientAuthProp => // ignore string
         case KafkaConfig.SSLCipherSuitesProp => // ignore string
 
+        //Sasl Configs
+        case KafkaConfig.SaslKerberosServiceNameProp => // ignore string
+        case KafkaConfig.SaslKerberosKinitCmdProp =>
+        case KafkaConfig.SaslKerberosTicketRenewWindowFactorProp =>
+        case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
+        case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
+        case KafkaConfig.AuthToLocalProp => // ignore string
+
         case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
index 871e49b..b160481 100644
--- a/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
@@ -17,6 +17,9 @@
 
 package kafka.server
 
+import org.apache.kafka.common.protocol.SecurityProtocol
+
 class PlaintextReplicaFetchTest extends BaseReplicaFetchTest {
+  protected def securityProtocol = SecurityProtocol.PLAINTEXT
   protected def trustStoreFile = None
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
new file mode 100644
index 0000000..740db37
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
@@ -0,0 +1,26 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.api.SaslTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SaslPlaintextReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness {
+  protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  protected def trustStoreFile = None
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
new file mode 100644
index 0000000..1bcf8ac
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
@@ -0,0 +1,28 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.io.File
+
+import kafka.api.SaslTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SaslSslReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness {
+  protected def securityProtocol = SecurityProtocol.SASL_SSL
+  protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
index 9858052..dad2285 100644
--- a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
@@ -19,6 +19,9 @@ package kafka.server
 
 import java.io.File
 
+import org.apache.kafka.common.protocol.SecurityProtocol
+
 class SslReplicaFetchTest extends BaseReplicaFetchTest {
-  protected def trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+  protected def securityProtocol = SecurityProtocol.SSL
+  protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1a0a7dc..46c88a3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -30,7 +30,7 @@ import kafka.security.auth.{Resource, Authorizer, Acl}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils._
 
-import collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 
@@ -52,6 +52,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.network.Mode
 import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.config.SSLConfigs
 import org.apache.kafka.test.TestSSLUtils
@@ -137,37 +138,66 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Create a test config for the given node id
+   * Create a test config for the provided parameters.
+    *
+   * Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled.
    */
   def createBrokerConfigs(numConfigs: Int,
     zkConnect: String,
     enableControlledShutdown: Boolean = true,
     enableDeleteTopic: Boolean = false,
-    enableSSL: Boolean = false,
-    trustStoreFile: Option[File] = None): Seq[Properties] = {
-    (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, enableSSL = enableSSL, trustStoreFile = trustStoreFile))
-  }
-
-  def getBrokerListStrFromServers(servers: Seq[KafkaServer]): String = {
-    servers.map(s => formatAddress(s.config.hostName, s.boundPort())).mkString(",")
+    interBrokerSecurityProtocol: Option[SecurityProtocol] = None,
+    trustStoreFile: Option[File] = None,
+    enablePlaintext: Boolean = true,
+    enableSsl: Boolean = false,
+    enableSaslPlaintext: Boolean = false,
+    enableSaslSsl: Boolean = false): Seq[Properties] = {
+    (0 until numConfigs).map { node =>
+      createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort,
+        interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = enablePlaintext, enableSsl = enableSsl,
+        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl)
+    }
   }
 
-  def getSSLBrokerListStrFromServers(servers: Seq[KafkaServer]): String = {
-    servers.map(s => formatAddress(s.config.hostName, s.boundPort(SecurityProtocol.SSL))).mkString(",")
+  def getBrokerListStrFromServers(servers: Seq[KafkaServer], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = {
+    servers.map(s => formatAddress(s.config.hostName, s.boundPort(protocol))).mkString(",")
   }
 
   /**
-   * Create a test config for the given node id
-   */
+    * Create a test config for the provided parameters.
+    *
+    * Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled.
+    */
   def createBrokerConfig(nodeId: Int, zkConnect: String,
     enableControlledShutdown: Boolean = true,
     enableDeleteTopic: Boolean = false,
-    port: Int = RandomPort, enableSSL: Boolean = false, sslPort: Int = RandomPort, trustStoreFile: Option[File] = None): Properties = {
+    port: Int = RandomPort,
+    interBrokerSecurityProtocol: Option[SecurityProtocol] = None,
+    trustStoreFile: Option[File] = None,
+    enablePlaintext: Boolean = true,
+    enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
+    enableSsl: Boolean = false, sslPort: Int = RandomPort,
+    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort)
+  : Properties = {
+
+    def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol)
+
+    val protocolAndPorts = ArrayBuffer[(SecurityProtocol, Int)]()
+    if (enablePlaintext || shouldEnable(SecurityProtocol.PLAINTEXT))
+      protocolAndPorts += SecurityProtocol.PLAINTEXT -> port
+    if (enableSsl || shouldEnable(SecurityProtocol.SSL))
+      protocolAndPorts += SecurityProtocol.SSL -> sslPort
+    if (enableSaslPlaintext || shouldEnable(SecurityProtocol.SASL_PLAINTEXT))
+      protocolAndPorts += SecurityProtocol.SASL_PLAINTEXT -> saslPlaintextPort
+    if (enableSaslSsl || shouldEnable(SecurityProtocol.SASL_SSL))
+      protocolAndPorts += SecurityProtocol.SASL_SSL -> saslSslPort
+
+    val listeners = protocolAndPorts.map { case (protocol, port) =>
+      s"${protocol.name}://localhost:$port"
+    }.mkString(",")
+
     val props = new Properties
-    var listeners: String = "PLAINTEXT://localhost:"+port.toString
     if (nodeId >= 0) props.put("broker.id", nodeId.toString)
-    if (enableSSL)
-      listeners = listeners + "," + "SSL://localhost:"+sslPort.toString
     props.put("listeners", listeners)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("zookeeper.connect", zkConnect)
@@ -176,9 +206,14 @@ object TestUtils extends Logging {
     props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
     props.put("delete.topic.enable", enableDeleteTopic.toString)
     props.put("controlled.shutdown.retry.backoff.ms", "100")
-    if (enableSSL) {
-      props.putAll(addSSLConfigs(SSLFactory.Mode.SERVER, true, trustStoreFile, "server"+nodeId))
+
+    if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })
+      props.putAll(sslConfigs(Mode.SERVER, true, trustStoreFile, s"server$nodeId"))
+
+    interBrokerSecurityProtocol.foreach { protocol =>
+      props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name)
     }
+
     props.put("port", port.toString)
     props
   }
@@ -404,28 +439,41 @@ object TestUtils extends Logging {
                         bufferSize: Long = 1024L * 1024L,
                         retries: Int = 0,
                         lingerMs: Long = 0,
-                        enableSSL: Boolean = false,
-                        trustStoreFile: Option[File] = None) : KafkaProducer[Array[Byte],Array[Byte]] = {
+                        securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
+                        trustStoreFile: Option[File] = None,
+                        props: Option[Properties] = None) : KafkaProducer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.producer.ProducerConfig
 
-    val producerProps = new Properties()
+    val producerProps = props.getOrElse(new Properties)
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
     producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
-    producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
-    producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
-    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
-    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    if (enableSSL) {
-      producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
-      producerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "producer"))
+
+    /* Only use these if not already set */
+    val defaultProps = Map(
+      ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
+      ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200",
+      ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString,
+      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer",
+      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer"
+    )
+    defaultProps.foreach { case (key, value) =>
+      if (!producerProps.containsKey(key)) producerProps.put(key, value)
     }
+
+    if (usesSslTransportLayer(securityProtocol))
+      producerProps.putAll(sslConfigs(Mode.CLIENT, false, trustStoreFile, "producer"))
+    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
     new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
   }
 
+  private def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match {
+    case SecurityProtocol.SSL | SecurityProtocol.SASL_SSL => true
+    case _ => false
+  }
+
   /**
    * Create a new consumer with a few pre-configured properties.
    */
@@ -435,7 +483,7 @@ object TestUtils extends Logging {
                         partitionFetchSize: Long = 4096L,
                         partitionAssignmentStrategy: String = "blah",
                         sessionTimeout: Int = 30000,
-                        enableSSL: Boolean = false,
+                        securityProtocol: SecurityProtocol,
                         trustStoreFile: Option[File] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.consumer.ConsumerConfig
 
@@ -450,10 +498,9 @@ object TestUtils extends Logging {
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignmentStrategy)
     consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toString)
-    if (enableSSL) {
-      consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
-      consumerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "consumer"))
-    }
+    if (usesSslTransportLayer(securityProtocol))
+      consumerProps.putAll(sslConfigs(Mode.CLIENT, false, trustStoreFile, "consumer"))
+    consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
     new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
   }
 
@@ -910,19 +957,18 @@ object TestUtils extends Logging {
     new String(bytes, encoding)
   }
 
-  def addSSLConfigs(mode: SSLFactory.Mode, clientCert: Boolean, trustStoreFile: Option[File],  certAlias: String): Properties = {
-    if (!trustStoreFile.isDefined) {
-      throw new Exception("enableSSL set to true but no trustStoreFile provided")
+  def sslConfigs(mode: Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias: String): Properties = {
+
+    val trustStore = trustStoreFile.getOrElse {
+      throw new Exception("SSL enabled but no trustStoreFile provided")
     }
 
+
     val sslConfigs = {
-      if (mode == SSLFactory.Mode.SERVER) {
-        val sslConfigs = TestSSLUtils.createSSLConfig(true, true, mode, trustStoreFile.get, certAlias)
-        sslConfigs.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.SSL.name)
-        sslConfigs
-      }
+      if (mode == Mode.SERVER)
+        TestSSLUtils.createSSLConfig(true, true, mode, trustStore, certAlias)
       else
-        TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStoreFile.get, certAlias)
+        TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStore, certAlias)
     }
 
     val sslProps = new Properties()


Mime
View raw message