kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/2] kafka git commit: KAFKA-5184 KAFKA-5173; Various improvements to SASL tests
Date Wed, 10 May 2017 21:38:55 GMT
KAFKA-5184 KAFKA-5173; Various improvements to SASL tests

1. Call `closeSasl` in `MultipleListenersWithSameSecurityProtocolBaseTest`
2. Refactor the code to make it easier to reason about
3. Add an assert that may possibly help us narrow down how KAFKA-5184
can happen (it seems impossible).
4. Remove SaslTestHarness to make it easier to reason about setUp
and tearDown methods.
5. Fix *AdminClientIntegrationTest to have a single `tearDown`
6. Remove a *ReplicaFetcherTest and *TopicMetadataTest secure variants.
They are redundant from a security perspective given the consumer and
producer tests.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3010 from ijuma/kafka-5184-kafka-5173-sasl-issues


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bede30a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bede30a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bede30a

Branch: refs/heads/trunk
Commit: 0bede30ada6d5719950ad35a69c586f0ba9c4f7e
Parents: a420d20
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed May 10 22:38:30 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed May 10 22:38:30 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Broker.scala  |   4 +-
 .../integration/kafka/api/BaseQuotaTest.scala   |   8 +-
 .../kafka/api/IntegrationTestHarness.scala      |   1 -
 .../api/KafkaAdminClientIntegrationTest.scala   |   3 +-
 .../api/SaslEndToEndAuthorizationTest.scala     |  15 +-
 .../api/SaslMultiMechanismConsumerTest.scala    |  26 +-
 .../api/SaslPlainPlaintextConsumerTest.scala    |  24 +-
 .../kafka/api/SaslPlaintextConsumerTest.scala   |  18 +-
 .../scala/integration/kafka/api/SaslSetup.scala |  70 +++--
 .../api/SaslSslAdminClientIntegrationTest.scala |  17 +-
 .../kafka/api/SaslSslConsumerTest.scala         |  17 +-
 .../integration/kafka/api/SaslTestHarness.scala |  39 ---
 .../api/SslEndToEndAuthorizationTest.scala      |   2 +-
 .../integration/kafka/api/UserQuotaTest.scala   |  18 +-
 ...ListenersWithAdditionalJaasContextTest.scala |  28 +-
 ...pleListenersWithDefaultJaasContextTest.scala |  14 +-
 ...tenersWithSameSecurityProtocolBaseTest.scala |  40 +--
 .../scala/kafka/security/minikdc/MiniKdc.scala  |  20 +-
 .../TransactionCoordinatorIntegrationTest.scala |   4 +-
 .../integration/BaseTopicMetadataTest.scala     | 304 -------------------
 .../PlaintextTopicMetadataTest.scala            |  26 --
 .../SaslPlaintextTopicMetadataTest.scala        |  27 --
 .../integration/SaslSslTopicMetadataTest.scala  |  29 --
 .../integration/SslTopicMetadataTest.scala      |  27 --
 .../kafka/integration/TopicMetadataTest.scala   | 298 ++++++++++++++++++
 .../security/auth/ZkAuthorizationTest.scala     |   2 +-
 .../kafka/server/BaseReplicaFetchTest.scala     |  88 ------
 .../server/PlaintextReplicaFetchTest.scala      |  25 --
 .../unit/kafka/server/ReplicaFetchTest.scala    |  83 +++++
 .../server/SaslApiVersionsRequestTest.scala     |  28 +-
 .../server/SaslPlaintextReplicaFetchTest.scala  |  27 --
 .../kafka/server/SaslSslReplicaFetchTest.scala  |  29 --
 .../unit/kafka/server/SslReplicaFetchTest.scala |  27 --
 .../scala/unit/kafka/utils/JaasTestUtils.scala  | 114 +++----
 .../test/scala/unit/kafka/utils/TestUtils.scala |   8 +-
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |   7 +-
 36 files changed, 670 insertions(+), 847 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 00b4078..184a750 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -169,13 +169,13 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
 
   def getNode(listenerName: ListenerName): Node = {
     val endpoint = endPointsMap.getOrElse(listenerName,
-      throw new BrokerEndPointNotAvailableException(s"End point with protocol label $listenerName not found for broker $id"))
+      throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id"))
     new Node(id, endpoint.host, endpoint.port, rack.orNull)
   }
 
   def getBrokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
     val endpoint = endPointsMap.getOrElse(listenerName,
-      throw new BrokerEndPointNotAvailableException(s"End point with security protocol $listenerName not found for broker $id"))
+      throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id"))
     new BrokerEndPoint(id, endpoint.host, endpoint.port)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 9fcdd9b..8d879a2 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -23,9 +23,8 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.{MetricName, TopicPartition}
 import org.apache.kafka.common.metrics.Quota
-import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.junit.{Before, Test}
 import kafka.server.QuotaType
 import org.apache.kafka.common.metrics.KafkaMetric
 
@@ -81,11 +80,6 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
   }
 
-  @After
-  override def tearDown() {
-    super.tearDown()
-  }
-
   @Test
   def testThrottledProducerConsumer() {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index ef113fb..921c2b4 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import kafka.utils.TestUtils
 import java.util.Properties
 
-import kafka.common.Topic
 import org.apache.kafka.clients.producer.KafkaProducer
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index 07eb673..ed513ea 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -55,9 +55,10 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
   }
 
   @After
-  def closeClient(): Unit = {
+  override def tearDown(): Unit = {
     if (client != null)
       Utils.closeQuietly(client, "AdminClient")
+    super.tearDown()
   }
 
   val brokerCount = 3

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index d4c417c..7e549c8 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -37,19 +37,14 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   
   @Before
   override def setUp {
-    startSasl(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both)
-    super.setUp
-  }
-
-  // Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests
-  override protected def setJaasConfiguration(mode: SaslSetupMode, serverEntryName: String,
-                                              serverMechanisms: List[String], clientMechanism: Option[String]) {
-    // create static config with client login context with credentials for JaasTestUtils 'client2'
-    super.setJaasConfiguration(mode, serverEntryName, kafkaServerSaslMechanisms, clientMechanism)
-    // set dynamic properties with credentials for JaasTestUtils 'client1'
+    // create static config including client login context with credentials for JaasTestUtils 'client2'
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
+    // set dynamic properties with credentials for JaasTestUtils 'client1' so that dynamic JAAS configuration is also
+    // tested by this set of tests
     val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
     consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+    super.setUp
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index 3ff133f..4206616 100644
--- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -13,22 +13,36 @@
 package kafka.api
 
 import java.io.File
+
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
-import org.junit.Test
-import kafka.utils.TestUtils
+import org.junit.{After, Before, Test}
+import kafka.utils.{JaasTestUtils, TestUtils}
+
 import scala.collection.JavaConverters._
 
-class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarness {
-  override protected val zkSaslEnabled = true
-  override protected val kafkaClientSaslMechanism = "PLAIN"
-  override protected val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
+class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
   override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
+      JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
+
   @Test
   def testMultipleBrokerMechanisms() {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index b1f378b..34d0ebd 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -19,15 +19,13 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.network.ListenerName
-import org.junit.Test
+import org.junit.{After, Before, Test}
 
-class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness {
-  override protected val zkSaslEnabled = true
-  override protected val zkAclsEnabled = Some(false)
+class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
   override protected def listenerName = new ListenerName("CLIENT")
-  override protected val kafkaClientSaslMechanism = "PLAIN"
-  override protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
-  override protected val kafkaServerJaasEntryName =
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+  private val kafkaServerJaasEntryName =
     s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -35,6 +33,18 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarne
   override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
+    super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
+
   /**
    * Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths
    * when zookeeper.set.acl=false, even if Zookeeper is SASL-enabled.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
index 0d11dc1..5eca9c8 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
@@ -12,9 +12,23 @@
   */
 package kafka.api
 
+import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{After, Before}
 
-class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness {
-  override protected val zkSaslEnabled = false
+class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 13ed2e2..e349fd4 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -23,7 +23,7 @@ import javax.security.auth.login.Configuration
 
 import kafka.security.minikdc.MiniKdc
 import kafka.server.KafkaConfig
-import kafka.utils.JaasTestUtils.JaasSection
+import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule}
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.authenticator.LoginManager
@@ -37,7 +37,6 @@ sealed trait SaslSetupMode
 case object ZkSasl extends SaslSetupMode
 case object KafkaSasl extends SaslSetupMode
 case object Both extends SaslSetupMode
-case object CustomKafkaServerSasl extends SaslSetupMode
 
 /*
  * Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files.
@@ -48,56 +47,63 @@ trait SaslSetup {
   private var kdc: MiniKdc = null
   private var serverKeytabFile: Option[File] = None
   private var clientKeytabFile: Option[File] = None
-  private var jaasContext: Seq[JaasSection] = Seq()
 
-  def startSasl(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String],
-                mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = JaasTestUtils.KafkaServerContextName,
-                withDefaultJaasContext: Boolean = true) {
+  def startSasl(jaasSections: Seq[JaasSection]) {
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
-    val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI"))
+    val hasKerberos = jaasSections.exists(_.modules.exists {
+      case _: Krb5LoginModule => true
+      case _ => false
+    })
     if (hasKerberos) {
-      val serverKeytabFile = TestUtils.tempFile()
-      val clientKeytabFile = TestUtils.tempFile()
-      this.clientKeytabFile = Some(clientKeytabFile)
-      this.serverKeytabFile = Some(serverKeytabFile)
+      val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
       kdc = new MiniKdc(kdcConf, workDir)
       kdc.start()
       kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
-      kdc.createPrincipal(clientKeytabFile, JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
+      kdc.createPrincipal(clientKeytabFile,
+        JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
     }
-    if (withDefaultJaasContext) {
-      setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
-      writeJaasConfigurationToFile()
-    } else
-        setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
-    if (mode == Both || mode == ZkSasl)
+    writeJaasConfigurationToFile(jaasSections)
+    val hasZk = jaasSections.exists(_.modules.exists {
+      case _: ZkDigestModule => true
+      case _ => false
+    })
+    if (hasZk)
       System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
   }
 
-  protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerEntryName: String,
-                                     kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) {
-    val jaasSection = mode match {
+  /** Return a tuple with the path to the server keytab file and client keytab file */
+  protected def maybeCreateEmptyKeytabFiles(): (File, File) = {
+    if (serverKeytabFile.isEmpty)
+      serverKeytabFile = Some(TestUtils.tempFile())
+    if (clientKeytabFile.isEmpty)
+      clientKeytabFile = Some(TestUtils.tempFile())
+    (serverKeytabFile.get, clientKeytabFile.get)
+  }
+
+  protected def jaasSections(kafkaServerSaslMechanisms: Seq[String],
+                             kafkaClientSaslMechanism: Option[String],
+                             mode: SaslSetupMode = Both,
+                             kafkaServerEntryName: String = JaasTestUtils.KafkaServerContextName): Seq[JaasSection] = {
+    val hasKerberos = mode != ZkSasl &&
+      (kafkaServerSaslMechanisms.contains("GSSAPI") || kafkaClientSaslMechanism.exists(_ == "GSSAPI"))
+    if (hasKerberos)
+      maybeCreateEmptyKeytabFiles()
+    mode match {
       case ZkSasl => JaasTestUtils.zkSections
       case KafkaSasl =>
         Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile),
           JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile))
-      case CustomKafkaServerSasl => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName,
-        kafkaServerSaslMechanisms, serverKeytabFile))
       case Both => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile),
         JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile)) ++ JaasTestUtils.zkSections
     }
-    jaasContext = jaasContext ++ jaasSection
   }
 
-  protected def writeJaasConfigurationToFile() {
+  private def writeJaasConfigurationToFile(jaasSections: Seq[JaasSection]) {
     // This will cause a reload of the Configuration singleton when `getConfiguration` is called
     Configuration.setConfiguration(null)
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, JaasTestUtils.writeJaasContextsToFile(jaasContext))
-  }
-
-  protected def removeJaasSection(context: String) {
-    jaasContext = jaasContext.filter(_.contextName != context)
+    val file = JaasTestUtils.writeJaasContextsToFile(jaasSections)
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath)
   }
 
   def closeSasl() {
@@ -110,14 +116,14 @@ trait SaslSetup {
     Configuration.setConfiguration(null)
   }
 
-  def kafkaServerSaslProperties(serverSaslMechanisms: Seq[String], interBrokerSaslMechanism: String) = {
+  def kafkaServerSaslProperties(serverSaslMechanisms: Seq[String], interBrokerSaslMechanism: String): Properties = {
     val props = new Properties
     props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, interBrokerSaslMechanism)
     props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverSaslMechanisms.mkString(","))
     props
   }
 
-  def kafkaClientSaslProperties(clientSaslMechanism: String, dynamicJaasConfig: Boolean = false) = {
+  def kafkaClientSaslProperties(clientSaslMechanism: String, dynamicJaasConfig: Boolean = false): Properties = {
     val props = new Properties
     props.put(SaslConfigs.SASL_MECHANISM, clientSaslMechanism)
     if (dynamicJaasConfig)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index f20ed0f..1bfcdf2 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -16,11 +16,24 @@ import java.io.File
 
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
+import kafka.utils.JaasTestUtils
+import org.junit.{After, Before}
 
-class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslTestHarness {
-  override protected val zkSaslEnabled = true
+class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslSetup {
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
index 388ddaf..450ea3e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -16,11 +16,24 @@ import java.io.File
 
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
+import kafka.utils.JaasTestUtils
+import org.junit.{After, Before}
 
-class SaslSslConsumerTest extends BaseConsumerTest with SaslTestHarness {
-  override protected val zkSaslEnabled = true
+class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
deleted file mode 100644
index 445a59c..0000000
--- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-package kafka.api
-
-import kafka.utils.JaasTestUtils
-import kafka.zk.ZooKeeperTestHarness
-import org.junit.{After, Before}
-
-trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup {
-  protected val zkSaslEnabled: Boolean
-  protected val kafkaServerJaasEntryName = JaasTestUtils.KafkaServerContextName
-  protected val kafkaClientSaslMechanism = "GSSAPI"
-  protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
-
-  @Before
-  override def setUp() {
-    if (zkSaslEnabled)
-      startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName)
-    else
-      startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName)
-    super.setUp
-  }
-
-  @After
-  override def tearDown() {
-    super.tearDown
-    closeSasl()
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index 4eca6e2..caa988d 100644
--- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -29,7 +29,7 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
 
   @Before
   override def setUp {
-    startSasl(List.empty, None, ZkSasl)
+    startSasl(jaasSections(List.empty, None, ZkSasl))
     super.setUp
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index d0e514c..4ad6265 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -18,17 +18,17 @@ import java.io.File
 import java.util.Properties
 
 import kafka.admin.AdminUtils
-import kafka.server.{KafkaConfig, ConfigEntityName, QuotaId}
+import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
 import kafka.utils.JaasTestUtils
-
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.Before
+import org.junit.{After, Before}
 
-class UserQuotaTest extends BaseQuotaTest with SaslTestHarness {
+class UserQuotaTest extends BaseQuotaTest with SaslSetup {
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-  override protected val zkSaslEnabled = false
+  private val kafkaServerSaslMechanisms = Seq("GSSAPI")
+  private val kafkaClientSaslMechanism = "GSSAPI"
   override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
@@ -36,8 +36,10 @@ class UserQuotaTest extends BaseQuotaTest with SaslTestHarness {
   override val producerQuotaId = QuotaId(Some(userPrincipal), None)
   override val consumerQuotaId = QuotaId(Some(userPrincipal), None)
 
+
   @Before
   override def setUp() {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
     this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
@@ -46,6 +48,12 @@ class UserQuotaTest extends BaseQuotaTest with SaslTestHarness {
     waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
+
   override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
     val props = quotaProperties(producerQuota, consumerQuota, requestQuota)
     updateQuotaOverride(props)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
index 3251be0..666ba4b 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
@@ -19,29 +19,27 @@ package kafka.server
 
 import java.util.Properties
 
-import kafka.api.CustomKafkaServerSasl
+import kafka.utils.JaasTestUtils
+import kafka.utils.JaasTestUtils.JaasSection
 import org.apache.kafka.common.network.ListenerName
 
-
-class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest{
+class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
 
   import MultipleListenersWithSameSecurityProtocolBaseTest._
 
-  override def setSaslProperties(listenerName: ListenerName): Option[Properties] = {
-
-    val gssapiSaslProperties = kafkaClientSaslProperties(GssApi, dynamicJaasConfig = true)
-    val plainSaslProperties = kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
-
+  override def saslProperties(listenerName: ListenerName): Properties = {
     listenerName.value match {
-      case SecureInternal => Some(plainSaslProperties)
-      case SecureExternal => Some(gssapiSaslProperties)
-      case _ => None
+      case SecureInternal => kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
+      case SecureExternal => kafkaClientSaslProperties(GssApi, dynamicJaasConfig = true)
+      case _ => throw new IllegalArgumentException(s"Unexpected listener name $listenerName")
     }
   }
 
-  override def addJaasSection(): Unit = {
-    setJaasConfiguration(CustomKafkaServerSasl, "secure_external.KafkaServer", List(GssApi), None)
-    setJaasConfiguration(CustomKafkaServerSasl, "secure_internal.KafkaServer", List(Plain), None)
-    removeJaasSection("KafkaServer")
+  override def jaasSections: Seq[JaasSection] = {
+    val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
+    JaasTestUtils.zkSections ++ Seq(
+      JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", Seq(GssApi), Some(serverKeytabFile)),
+      JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(Plain), None)
+    )
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
index 8291d82..f3e1b8b 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
@@ -19,6 +19,8 @@ package kafka.server
 
 import java.util.Properties
 
+import kafka.api.Both
+import kafka.utils.JaasTestUtils.JaasSection
 import org.apache.kafka.common.network.ListenerName
 
 
@@ -26,12 +28,10 @@ class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithS
 
   import MultipleListenersWithSameSecurityProtocolBaseTest._
 
-  override def setSaslProperties(listenerName: ListenerName): Option[Properties] = {
-    val plainSaslProperties = kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
+  override def saslProperties(listenerName: ListenerName): Properties =
+    kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
+
+  override def jaasSections: Seq[JaasSection] =
+    jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both)
 
-    listenerName.value match {
-      case SecureExternal | SecureInternal => Some(plainSaslProperties)
-      case _ => None
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 9765279..88b314f 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -22,16 +22,16 @@ import java.io.File
 import java.util.{Collections, Properties}
 import java.util.concurrent.TimeUnit
 
-import kafka.api.{Both, SaslSetup}
+import kafka.api.SaslSetup
 import kafka.common.Topic
 import kafka.coordinator.group.OffsetConfig
+import kafka.utils.JaasTestUtils.JaasSection
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.network.{ListenerName, Mode}
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Assert.assertEquals
 import org.junit.{After, Before, Test}
 
@@ -48,7 +48,7 @@ object MultipleListenersWithSameSecurityProtocolBaseTest {
   val Plain = "PLAIN"
 }
 
-abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeeperTestHarness with SaslSetup{
+abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeeperTestHarness with SaslSetup {
 
   import MultipleListenersWithSameSecurityProtocolBaseTest._
 
@@ -56,17 +56,16 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
   private val servers = new ArrayBuffer[KafkaServer]
   private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]()
   private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]()
-  private val kafkaClientSaslMechanism = Plain
-  private val kafkaServerSaslMechanisms = List(GssApi, Plain)
 
-  protected def setSaslProperties(listenerName: ListenerName): Option[Properties]
-  protected def addJaasSection(): Unit = {}
+  protected val kafkaClientSaslMechanism = Plain
+  protected val kafkaServerSaslMechanisms = List(GssApi, Plain)
+
+  protected def saslProperties(listenerName: ListenerName): Properties
+  protected def jaasSections: Seq[JaasSection]
 
   @Before
   override def setUp(): Unit = {
-    startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, withDefaultJaasContext = false)
-    addJaasSection()
-    writeJaasConfigurationToFile()
+    startSasl(jaasSections)
     super.setUp()
     // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest
     val numServers = 2
@@ -97,30 +96,36 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
       servers += TestUtils.createServer(KafkaConfig.fromProps(props))
     }
 
-    val serverConfig = servers.head.config
-    assertEquals(4, serverConfig.listeners.size)
+    servers.map(_.config).foreach { config =>
+      assertEquals(s"Unexpected listener count for broker ${config.brokerId}", 4, config.listeners.size)
+      // KAFKA-5184 seems to show that this value can sometimes be PLAINTEXT, so verify it here
+      assertEquals(s"Unexpected ${KafkaConfig.InterBrokerListenerNameProp} for broker ${config.brokerId}",
+        Internal, config.interBrokerListenerName.value)
+    }
 
     TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, OffsetConfig.DefaultOffsetsTopicNumPartitions,
       replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
 
-    serverConfig.listeners.foreach { endPoint =>
+    servers.head.config.listeners.foreach { endPoint =>
       val listenerName = endPoint.listenerName
 
       TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers)
 
       val trustStoreFile =
-        if (endPoint.securityProtocol == SecurityProtocol.SASL_SSL) Some(this.trustStoreFile)
+        if (TestUtils.usesSslTransportLayer(endPoint.securityProtocol)) Some(this.trustStoreFile)
         else None
 
-      val saslProperties = setSaslProperties(listenerName)
+      val saslProps =
+        if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) Some(saslProperties(listenerName))
+        else None
 
       val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
 
       producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
-        securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProperties)
+        securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
 
       consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value,
-        securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProperties)
+        securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
     }
   }
 
@@ -133,6 +138,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
       CoreUtils.delete(s.config.logDirs)
     }
     super.tearDown()
+    closeSasl()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index 0894b34..c08dd99 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -108,6 +108,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
   private var _port = config.getProperty(MiniKdc.KdcPort).toInt
   private var ds: DirectoryService = null
   private var kdc: KdcServer = null
+  private var closed = false
 
   def port: Int = _port
 
@@ -116,6 +117,8 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
   def start() {
     if (kdc != null)
       throw new RuntimeException("KDC already started")
+    if (closed)
+      throw new RuntimeException("KDC is closed")
     initDirectoryService()
     initKdcServer()
     initJvmKerberosConfig()
@@ -261,13 +264,16 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
   }
 
   def stop() {
-    if (kdc != null) {
-      System.clearProperty(MiniKdc.JavaSecurityKrb5Conf)
-      System.clearProperty(MiniKdc.SunSecurityKrb5Debug)
-      kdc.stop()
-      try ds.shutdown()
-      catch {
-        case ex: Exception => error("Could not shutdown ApacheDS properly", ex)
+    if (!closed) {
+      closed = true
+      if (kdc != null) {
+        System.clearProperty(MiniKdc.JavaSecurityKrb5Conf)
+        System.clearProperty(MiniKdc.SunSecurityKrb5Debug)
+        kdc.stop()
+        try ds.shutdown()
+        catch {
+          case ex: Exception => error("Could not shutdown ApacheDS properly", ex)
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
index 80ef687..20d1161 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
@@ -14,19 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package unit.kafka.coordinator.transaction
+package kafka.coordinator.transaction
 
 import java.util.Properties
 
 import kafka.common.Topic
-import kafka.coordinator.transaction.InitPidResult
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.Utils
 import org.junit.{Assert, Test}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
deleted file mode 100644
index e53d348..0000000
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.integration
-
-import java.io.File
-
-import kafka.admin.AdminUtils
-import kafka.api.TopicMetadataResponse
-import kafka.client.ClientUtils
-import kafka.cluster.BrokerEndPoint
-import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.junit.Assert._
-import org.junit.{Test, After, Before}
-
-abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
-  private var server1: KafkaServer = null
-  var brokerEndPoints: Seq[BrokerEndPoint] = null
-  var adHocConfigs: Seq[KafkaConfig] = null
-  val numConfigs: Int = 4
-
-  // This should be defined if `securityProtocol` uses SSL (eg SSL, SASL_SSL)
-  protected def trustStoreFile: Option[File]
-  protected def securityProtocol: SecurityProtocol
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    val props = createBrokerConfigs(numConfigs, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile)
-    val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
-    adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases
-    server1 = TestUtils.createServer(configs.head)
-    brokerEndPoints = Seq(
-      // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
-      // `securityProtocol` instead of PLAINTEXT below
-      new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, TestUtils.boundPort(server1))
-    )
-  }
-
-  @After
-  override def tearDown() {
-    server1.shutdown()
-    super.tearDown()
-  }
-
-  @Test
-  def testBasicTopicMetadata {
-    // create topic
-    val topic = "test"
-    createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000, 0).topicsMetadata
-    assertEquals(Errors.NONE, topicsMetadata.head.error)
-    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
-    val partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-  }
-
-  @Test
-  def testGetAllTopicMetadata {
-    // create topic
-    val topic1 = "testGetAllTopicMetadata1"
-    val topic2 = "testGetAllTopicMetadata2"
-    createTopic(zkUtils, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-    createTopic(zkUtils, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-
-    // issue metadata request with empty list of topics
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
-      2000, 0).topicsMetadata
-    assertEquals(Errors.NONE, topicsMetadata.head.error)
-    assertEquals(2, topicsMetadata.size)
-    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
-    assertEquals(Errors.NONE, topicsMetadata.last.partitionsMetadata.head.error)
-    val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
-    val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId)
-    assertEquals(1, partitionMetadataTopic1.head.replicas.size)
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
-    assertEquals(1, partitionMetadataTopic2.head.replicas.size)
-  }
-
-  @Test
-  def testAutoCreateTopic {
-    // auto create topic
-    val topic = "testAutoCreateTopic"
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
-      2000,0).topicsMetadata
-    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
-    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-
-    // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
-
-    // retry the metadata for the auto created topic
-    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000,0).topicsMetadata
-    assertEquals(Errors.NONE, topicsMetadata.head.error)
-    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
-    val partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-    assertTrue(partitionMetadata.head.leader.isDefined)
-  }
-
-  @Test
-  def testAutoCreateTopicWithInvalidReplication {
-    val adHocProps = createBrokerConfig(2, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile)
-    // Set default replication higher than the number of live brokers
-    adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
-    // start adHoc brokers with replication factor too high
-    val adHocServer = createServer(new KafkaConfig(adHocProps))
-    // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
-    // `securityProtocol` instead of PLAINTEXT below
-    val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName,
-      TestUtils.boundPort(adHocServer))
-
-    // auto create topic on "bad" endpoint
-    val topic = "testAutoCreateTopic"
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic",
-      2000, 0).topicsMetadata
-    assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicsMetadata.head.error)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
-    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-
-    adHocServer.shutdown()
-  }
-
-  @Test
-  def testAutoCreateTopicWithCollision {
-    // auto create topic
-    val topic1 = "testAutoCreate_Topic"
-    val topic2 = "testAutoCreate.Topic"
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
-      2000, 0).topicsMetadata
-    assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
-    assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
-    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
-    assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
-    assertEquals("Expecting InvalidTopicCode for topic2 metadata", Errors.INVALID_TOPIC_EXCEPTION, topicsMetadata(1).error)
-
-    // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 0)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
-
-    // retry the metadata for the first auto created topic
-    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000, 0).topicsMetadata
-    assertEquals(Errors.NONE, topicsMetadata.head.error)
-    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
-    var partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-    assertTrue(partitionMetadata.head.leader.isDefined)
-  }
-
-  private def checkIsr(servers: Seq[KafkaServer]): Unit = {
-    val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state)
-    val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map { x =>
-      new BrokerEndPoint(x.config.brokerId,
-        if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
-        TestUtils.boundPort(x))
-    }
-
-    // Assert that topic metadata at new brokers is updated correctly
-    activeBrokers.foreach(x => {
-      var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
-      waitUntilTrue(() => {
-        metadata = ClientUtils.fetchTopicMetadata(Set.empty,
-                                Seq(new BrokerEndPoint(x.config.brokerId,
-                                                       if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
-                                                       TestUtils.boundPort(x))),
-                                "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
-        metadata.topicsMetadata.nonEmpty &&
-          metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
-          expectedIsr.sortBy(_.id) == metadata.topicsMetadata.head.partitionsMetadata.head.isr.sortBy(_.id)
-      },
-        "Topic metadata is not correctly updated for broker " + x + ".\n" +
-        "Expected ISR: " + expectedIsr + "\n" +
-        "Actual ISR  : " + (if (metadata.topicsMetadata.nonEmpty &&
-                                metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
-                              metadata.topicsMetadata.head.partitionsMetadata.head.isr
-                            else
-                              ""), 8000L)
-    })
-  }
-
-  @Test
-  def testIsrAfterBrokerShutDownAndJoinsBack {
-    val numBrokers = 2 //just 2 brokers are enough for the test
-
-    // start adHoc brokers
-    val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
-    val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
-
-    // create topic
-    val topic: String = "test"
-    AdminUtils.createTopic(zkUtils, topic, 1, numBrokers)
-
-    // shutdown a broker
-    adHocServers.last.shutdown()
-    adHocServers.last.awaitShutdown()
-
-    // startup a broker
-    adHocServers.last.startup()
-
-    // check metadata is still correct and updated at all brokers
-    checkIsr(allServers)
-
-    // shutdown adHoc brokers
-    adHocServers.map(p => p.shutdown())
-  }
-
-  private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = {
-    var topicMetadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
-
-    // Get topic metadata from old broker
-    // Wait for metadata to get updated by checking metadata from a new broker
-    waitUntilTrue(() => {
-    topicMetadata = ClientUtils.fetchTopicMetadata(
-      Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
-    topicMetadata.brokers.size == expectedBrokersCount},
-      "Alive brokers list is not correctly propagated by coordinator to brokers"
-    )
-
-    // Assert that topic metadata at new brokers is updated correctly
-    servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
-      waitUntilTrue(() => {
-          val foundMetadata = ClientUtils.fetchTopicMetadata(
-            Set.empty,
-            Seq(new BrokerEndPoint(x.config.brokerId, x.config.hostName, TestUtils.boundPort(x))),
-            "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
-          topicMetadata.brokers.sortBy(_.id) == foundMetadata.brokers.sortBy(_.id) &&
-            topicMetadata.topicsMetadata.sortBy(_.topic) == foundMetadata.topicsMetadata.sortBy(_.topic)
-        },
-        s"Topic metadata is not correctly updated"))
-  }
-
-  @Test
-  def testAliveBrokerListWithNoTopics {
-    checkMetadata(Seq(server1), 1)
-  }
-
-  @Test
-  def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
-    var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
-
-    checkMetadata(adHocServers, numConfigs - 1)
-
-    // Add a broker
-    adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
-
-    checkMetadata(adHocServers, numConfigs)
-    adHocServers.map(p => p.shutdown())
-  }
-
-
-  @Test
-  def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
-    val adHocServers = adHocConfigs.map(p => createServer(p))
-
-    checkMetadata(adHocServers, numConfigs)
-
-    // Shutdown a broker
-    adHocServers.last.shutdown()
-    adHocServers.last.awaitShutdown()
-
-    checkMetadata(adHocServers, numConfigs - 1)
-
-    adHocServers.map(p => p.shutdown())
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
deleted file mode 100644
index 55c12b5..0000000
--- a/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class PlaintextTopicMetadataTest extends BaseTopicMetadataTest {
-  protected def securityProtocol = SecurityProtocol.PLAINTEXT
-  protected def trustStoreFile = None
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
deleted file mode 100644
index 888207c..0000000
--- a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.integration
-
-import kafka.api.SaslTestHarness
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SaslPlaintextTopicMetadataTest extends BaseTopicMetadataTest with SaslTestHarness {
-  override protected val zkSaslEnabled = false
-  protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-  protected def trustStoreFile = None
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
deleted file mode 100644
index f313280..0000000
--- a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.integration
-
-import java.io.File
-
-import kafka.api.SaslTestHarness
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SaslSslTopicMetadataTest extends BaseTopicMetadataTest with SaslTestHarness {
-  override protected val zkSaslEnabled = false
-  protected def securityProtocol = SecurityProtocol.SASL_SSL
-  protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
deleted file mode 100644
index ee73457..0000000
--- a/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import java.io.File
-
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SslTopicMetadataTest extends BaseTopicMetadataTest {
-  protected def securityProtocol = SecurityProtocol.SSL
-  protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
new file mode 100644
index 0000000..d63d5b2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.integration
+
+import java.io.File
+
+import kafka.admin.AdminUtils
+import kafka.api.TopicMetadataResponse
+import kafka.client.ClientUtils
+import kafka.cluster.BrokerEndPoint
+import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.Errors
+import org.junit.Assert._
+import org.junit.{Test, After, Before}
+
+class TopicMetadataTest extends ZooKeeperTestHarness {
+  private var server1: KafkaServer = null
+  var brokerEndPoints: Seq[BrokerEndPoint] = null
+  var adHocConfigs: Seq[KafkaConfig] = null
+  val numConfigs: Int = 4
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val props = createBrokerConfigs(numConfigs, zkConnect)
+    val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
+    adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases
+    server1 = TestUtils.createServer(configs.head)
+    brokerEndPoints = Seq(
+      // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
+      // `securityProtocol` instead of PLAINTEXT below
+      new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, TestUtils.boundPort(server1))
+    )
+  }
+
+  @After
+  override def tearDown() {
+    server1.shutdown()
+    super.tearDown()
+  }
+
+  @Test
+  def testBasicTopicMetadata {
+    // create topic
+    val topic = "test"
+    createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
+      2000, 0).topicsMetadata
+    assertEquals(Errors.NONE, topicsMetadata.head.error)
+    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
+    val partitionMetadata = topicsMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(1, partitionMetadata.head.replicas.size)
+  }
+
+  @Test
+  def testGetAllTopicMetadata {
+    // create topic
+    val topic1 = "testGetAllTopicMetadata1"
+    val topic2 = "testGetAllTopicMetadata2"
+    createTopic(zkUtils, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+    createTopic(zkUtils, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+
+    // issue metadata request with empty list of topics
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
+      2000, 0).topicsMetadata
+    assertEquals(Errors.NONE, topicsMetadata.head.error)
+    assertEquals(2, topicsMetadata.size)
+    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
+    assertEquals(Errors.NONE, topicsMetadata.last.partitionsMetadata.head.error)
+    val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
+    val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId)
+    assertEquals(1, partitionMetadataTopic1.head.replicas.size)
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
+    assertEquals(1, partitionMetadataTopic2.head.replicas.size)
+  }
+
+  @Test
+  def testAutoCreateTopic {
+    // auto create topic
+    val topic = "testAutoCreateTopic"
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
+      2000,0).topicsMetadata
+    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
+    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
+
+    // retry the metadata for the auto created topic
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(Errors.NONE, topicsMetadata.head.error)
+    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
+    val partitionMetadata = topicsMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(1, partitionMetadata.head.replicas.size)
+    assertTrue(partitionMetadata.head.leader.isDefined)
+  }
+
+  @Test
+  def testAutoCreateTopicWithInvalidReplication {
+    val adHocProps = createBrokerConfig(2, zkConnect)
+    // Set default replication higher than the number of live brokers
+    adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
+    // start adHoc brokers with replication factor too high
+    val adHocServer = createServer(new KafkaConfig(adHocProps))
+    // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
+    // `securityProtocol` instead of PLAINTEXT below
+    val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName,
+      TestUtils.boundPort(adHocServer))
+
+    // auto create topic on "bad" endpoint
+    val topic = "testAutoCreateTopic"
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic",
+      2000, 0).topicsMetadata
+    assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicsMetadata.head.error)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
+    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
+
+    adHocServer.shutdown()
+  }
+
+  @Test
+  def testAutoCreateTopicWithCollision {
+    // auto create topic
+    val topic1 = "testAutoCreate_Topic"
+    val topic2 = "testAutoCreate.Topic"
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
+      2000, 0).topicsMetadata
+    assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
+    assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
+    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
+    assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
+    assertEquals("Expecting InvalidTopicCode for topic2 metadata", Errors.INVALID_TOPIC_EXCEPTION, topicsMetadata(1).error)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 0)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
+
+    // retry the metadata for the first auto created topic
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
+      2000, 0).topicsMetadata
+    assertEquals(Errors.NONE, topicsMetadata.head.error)
+    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(1, partitionMetadata.head.replicas.size)
+    assertTrue(partitionMetadata.head.leader.isDefined)
+  }
+
+  private def checkIsr(servers: Seq[KafkaServer]): Unit = {
+    val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state)
+    val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map { x =>
+      new BrokerEndPoint(x.config.brokerId,
+        if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+        TestUtils.boundPort(x))
+    }
+
+    // Assert that topic metadata at new brokers is updated correctly
+    activeBrokers.foreach(x => {
+      var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
+      waitUntilTrue(() => {
+        metadata = ClientUtils.fetchTopicMetadata(Set.empty,
+                                Seq(new BrokerEndPoint(x.config.brokerId,
+                                                       if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+                                                       TestUtils.boundPort(x))),
+                                "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
+        metadata.topicsMetadata.nonEmpty &&
+          metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
+          expectedIsr.sortBy(_.id) == metadata.topicsMetadata.head.partitionsMetadata.head.isr.sortBy(_.id)
+      },
+        "Topic metadata is not correctly updated for broker " + x + ".\n" +
+        "Expected ISR: " + expectedIsr + "\n" +
+        "Actual ISR  : " + (if (metadata.topicsMetadata.nonEmpty &&
+                                metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
+                              metadata.topicsMetadata.head.partitionsMetadata.head.isr
+                            else
+                              ""), 8000L)
+    })
+  }
+
+  @Test
+  def testIsrAfterBrokerShutDownAndJoinsBack {
+    val numBrokers = 2 //just 2 brokers are enough for the test
+
+    // start adHoc brokers
+    val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
+    val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
+
+    // create topic
+    val topic: String = "test"
+    AdminUtils.createTopic(zkUtils, topic, 1, numBrokers)
+
+    // shutdown a broker
+    adHocServers.last.shutdown()
+    adHocServers.last.awaitShutdown()
+
+    // startup a broker
+    adHocServers.last.startup()
+
+    // check metadata is still correct and updated at all brokers
+    checkIsr(allServers)
+
+    // shutdown adHoc brokers
+    adHocServers.map(p => p.shutdown())
+  }
+
+  private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = {
+    var topicMetadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
+
+    // Get topic metadata from old broker
+    // Wait for metadata to get updated by checking metadata from a new broker
+    waitUntilTrue(() => {
+    topicMetadata = ClientUtils.fetchTopicMetadata(
+      Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
+    topicMetadata.brokers.size == expectedBrokersCount},
+      "Alive brokers list is not correctly propagated by coordinator to brokers"
+    )
+
+    // Assert that topic metadata at new brokers is updated correctly
+    servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
+      waitUntilTrue(() => {
+          val foundMetadata = ClientUtils.fetchTopicMetadata(
+            Set.empty,
+            Seq(new BrokerEndPoint(x.config.brokerId, x.config.hostName, TestUtils.boundPort(x))),
+            "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
+          topicMetadata.brokers.sortBy(_.id) == foundMetadata.brokers.sortBy(_.id) &&
+            topicMetadata.topicsMetadata.sortBy(_.topic) == foundMetadata.topicsMetadata.sortBy(_.topic)
+        },
+        s"Topic metadata is not correctly updated"))
+  }
+
+  @Test
+  def testAliveBrokerListWithNoTopics {
+    checkMetadata(Seq(server1), 1)
+  }
+
+  @Test
+  def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
+    var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
+
+    checkMetadata(adHocServers, numConfigs - 1)
+
+    // Add a broker
+    adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
+
+    checkMetadata(adHocServers, numConfigs)
+    adHocServers.map(p => p.shutdown())
+  }
+
+
+  @Test
+  def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
+    val adHocServers = adHocConfigs.map(p => createServer(p))
+
+    checkMetadata(adHocServers, numConfigs)
+
+    // Shutdown a broker
+    adHocServers.last.shutdown()
+    adHocServers.last.awaitShutdown()
+
+    checkMetadata(adHocServers, numConfigs - 1)
+
+    adHocServers.map(p => p.shutdown())
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 5ea92aa..8cae885 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -36,7 +36,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
   @Before
   override def setUp() {
     Configuration.setConfiguration(null)
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
     System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
     super.setUp()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
deleted file mode 100644
index 7e9404e..0000000
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.server
-
-import java.io.File
-
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{After, Before, Test}
-import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.TestUtils
-import TestUtils._
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.serialization.StringSerializer
-
-abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
-  var brokers: Seq[KafkaServer] = null
-  val topic1 = "foo"
-  val topic2 = "bar"
-
-  // This should be defined if `securityProtocol` uses SSL (eg SSL, SASL_SSL)
-  protected def trustStoreFile: Option[File]
-  protected def securityProtocol: SecurityProtocol
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    val props = createBrokerConfigs(2, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile)
-    brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
-  }
-
-  @After
-  override def tearDown() {
-    brokers.foreach(_.shutdown())
-    super.tearDown()
-  }
-
-  @Test
-  def testReplicaFetcherThread() {
-    val partition = 0
-    val testMessageList1 = List("test1", "test2", "test3", "test4")
-    val testMessageList2 = List("test5", "test6", "test7", "test8")
-
-    // create a topic and partition and await leadership
-    for (topic <- List(topic1,topic2)) {
-      createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 2, servers = brokers)
-    }
-
-    // send test messages to leader
-    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers),
-                                               retries = 5,
-                                               keySerializer = new StringSerializer,
-                                               valueSerializer = new StringSerializer)
-    val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++
-      testMessageList2.map(m => new ProducerRecord(topic2, m, m))
-    records.map(producer.send).foreach(_.get)
-    producer.close()
-
-    def logsMatch(): Boolean = {
-      var result = true
-      for (topic <- List(topic1, topic2)) {
-        val tp = new TopicPartition(topic, partition)
-        val expectedOffset = brokers.head.getLogManager().getLog(tp).get.logEndOffset
-        result = result && expectedOffset > 0 && brokers.forall { item =>
-          expectedOffset == item.getLogManager().getLog(tp).get.logEndOffset
-        }
-      }
-      result
-    }
-    waitUntilTrue(logsMatch _, "Broker logs should be identical")
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
deleted file mode 100644
index b160481..0000000
--- a/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class PlaintextReplicaFetchTest extends BaseReplicaFetchTest {
-  protected def securityProtocol = SecurityProtocol.PLAINTEXT
-  protected def trustStoreFile = None
-}


Mime
View raw message