kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5272; Policy for Alter Configs (KIP-133)
Date Sat, 03 Jun 2017 02:15:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 2223d1389 -> 8ebe4838a


KAFKA-5272; Policy for Alter Configs (KIP-133)

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3210 from ijuma/kafka-5272-improve-validation-for-describe-alter-configs

(cherry picked from commit eb5586102e5bb4b66f2cacafbe45f9a7bad8eb10)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ebe4838
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ebe4838
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ebe4838

Branch: refs/heads/0.11.0
Commit: 8ebe4838a94d25938ed86c79d555f54e4752e64b
Parents: 2223d13
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Sat Jun 3 03:14:55 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jun 3 03:15:12 2017 +0100

----------------------------------------------------------------------
 .../kafka/server/policy/AlterConfigPolicy.java  |  85 ++++
 .../clients/admin/KafkaAdminClientTest.java     |   2 +-
 .../main/scala/kafka/server/AdminManager.scala  |  34 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |   5 +
 .../kafka/api/AdminClientIntegrationTest.scala  | 491 +++++++++++++++++++
 ...AdminClientWithPoliciesIntegrationTest.scala | 209 ++++++++
 .../api/KafkaAdminClientIntegrationTest.scala   | 475 ------------------
 .../api/SaslSslAdminClientIntegrationTest.scala |   2 +-
 8 files changed, 818 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8ebe4838/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java
new file mode 100644
index 0000000..ca47efa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.PolicyViolationException;
+
+import java.util.Map;
+
+/**
+ * An interface for enforcing a policy on alter configs requests.
+ *
+ * Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a
+ * topic remain within an allowable range.
+ *
+ * If <code>alter.config.policy.class.name</code> is defined, Kafka will create an instance of the specified class
+ * using the default constructor and will then pass the broker configs to its <code>configure()</code> method. During
+ * broker shutdown, the <code>close()</code> method will be invoked so that resources can be released (if necessary).
+ */
+public interface AlterConfigPolicy extends Configurable, AutoCloseable {
+
+    /**
+     * Class containing the create request parameters.
+     */
+    class RequestMetadata {
+
+        private final ConfigResource resource;
+        private final Map<String, String> configs;
+
+        /**
+         * Create an instance of this class with the provided parameters.
+         *
+         * This constructor is public to make testing of <code>AlterConfigPolicy</code> implementations easier.
+         */
+        public RequestMetadata(ConfigResource resource, Map<String, String> configs) {
+            this.resource = resource;
+            this.configs = configs;
+        }
+
+        /**
+         * Return the configs in the request.
+         */
+        public Map<String, String> configs() {
+            return configs;
+        }
+
+        public ConfigResource resource() {
+            return resource;
+        }
+
+        @Override
+        public String toString() {
+            return "AlterConfigPolicy.RequestMetadata(resource=" + resource +
+                    ", configs=" + configs + ")";
+        }
+    }
+
+    /**
+     * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
+     * message if the alter configs request parameters for the provided resource do not satisfy this policy.
+     *
+     * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
+     * failure only affects the relevant resource, other resources in the request will still be processed.
+     *
+     * @param requestMetadata the alter configs request parameters for the provided resource (topic is the only resource
+     *                        type whose configs can be updated currently).
+     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
+     */
+    void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ebe4838/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6f9e6af..c0e86e9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -67,7 +67,7 @@ import static org.junit.Assert.fail;
 /**
  * A unit test for KafkaAdminClient.
  *
- * See KafkaAdminClientIntegrationTest for an integration test of the KafkaAdminClient.
+ * See AdminClientIntegrationTest for an integration test.
  */
 public class KafkaAdminClientTest {
     @Rule

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ebe4838/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index c147593..33c6b77 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,14 +23,14 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, PolicyViolationException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.CreateTopicsRequest._
 import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType}
-import org.apache.kafka.server.policy.CreateTopicPolicy
+import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
 
 import scala.collection._
@@ -47,6 +47,9 @@ class AdminManager(val config: KafkaConfig,
   private val createTopicPolicy =
     Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
 
+  private val alterConfigPolicy =
+    Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
+
   def hasDelayedTopicOperations = topicPurgatory.delayed != 0
 
   /**
@@ -255,14 +258,28 @@ class AdminManager(val config: KafkaConfig,
         resource.`type` match {
           case ResourceType.TOPIC =>
             val topic = resource.name
+
             val properties = new Properties
             config.entries.asScala.foreach { configEntry =>
               properties.setProperty(configEntry.name(), configEntry.value())
             }
-            if (validateOnly)
-              AdminUtils.validateTopicConfig(zkUtils, topic, properties)
-            else
-              AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+
+            alterConfigPolicy match {
+              case Some(policy) =>
+                AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+
+                val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
+                policy.validate(new AlterConfigPolicy.RequestMetadata(
+                  new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
+
+                if (!validateOnly)
+                  AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+              case None =>
+                if (validateOnly)
+                  AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+                else
+                  AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+            }
             resource -> new ApiError(Errors.NONE, null)
           case resourceType =>
             throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType")
@@ -274,8 +291,8 @@ class AdminManager(val config: KafkaConfig,
           resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
         case e: Throwable =>
           // Log client errors at a lower level than unexpected exceptions
-          val message = s"Error processing alter configs request for resource $resource"
-          if (e.isInstanceOf[ApiException])
+          val message = s"Error processing alter configs request for resource $resource, config $config"
+          if (e.isInstanceOf[ApiException] || e.isInstanceOf[PolicyViolationException])
             info(message, e)
           else
             error(message, e)
@@ -287,5 +304,6 @@ class AdminManager(val config: KafkaConfig,
   def shutdown() {
     topicPurgatory.shutdown()
     CoreUtils.swallow(createTopicPolicy.foreach(_.close()))
+    CoreUtils.swallow(alterConfigPolicy.foreach(_.close()))
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ebe4838/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6e94043..fe47fd0 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -301,6 +301,7 @@ object KafkaConfig {
   val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
   val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name"
+  val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name"
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
   val DefaultReplicationFactorProp = "default.replication.factor"
@@ -529,6 +530,9 @@ object KafkaConfig {
 
   val CreateTopicPolicyClassNameDoc = "The create topic policy class that should be used for validation. The class should " +
     "implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
+  val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " +
+    "implement the <code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface."
+
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
   val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels"
@@ -748,6 +752,7 @@ object KafkaConfig {
       .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
       .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
       .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
+      .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc)
 
       /** ********* Replication configuration ***********/
       .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ebe4838/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
new file mode 100644
index 0000000..0a1d229
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util
+import java.util.{Collections, Properties}
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import org.apache.kafka.common.utils.{Time, Utils}
+import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig
+import kafka.server.{Defaults, KafkaConfig, KafkaServer}
+import org.apache.kafka.clients.admin._
+import kafka.utils.{Logging, TestUtils, ZkUtils}
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.common.KafkaFuture
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.{After, Before, Rule, Test}
+import org.apache.kafka.common.requests.MetadataResponse
+import org.apache.kafka.common.resource.{Resource, ResourceType}
+import org.junit.rules.Timeout
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+/**
+ * An integration test of the KafkaAdminClient.
+ *
+ * Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
+ */
+class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
+
+  import AdminClientIntegrationTest._
+
+  @Rule
+  def globalTimeout = Timeout.millis(120000)
+
+  var client: AdminClient = null
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (client != null)
+      Utils.closeQuietly(client, "AdminClient")
+    super.tearDown()
+  }
+
+  val brokerCount = 3
+  lazy val serverConfig = new Properties
+
+  def createConfig(): util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
+    TestUtils.waitUntilTrue(() => {
+        val topics = client.listTopics.names.get()
+        expectedPresent.forall(topicName => topics.contains(topicName)) &&
+          expectedMissing.forall(topicName => !topics.contains(topicName))
+      }, "timed out waiting for topics")
+  }
+
+  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
+    try {
+      future.get()
+      fail("Expected CompletableFuture.get to return an exception")
+    } catch {
+      case e: ExecutionException =>
+        val cause = e.getCause()
+        assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
+            cause.getClass().getName, clazz.isInstance(cause))
+    }
+  }
+
+  @Test
+  def testClose(): Unit = {
+    val client = AdminClient.create(createConfig())
+    client.close()
+    client.close() // double close has no effect
+  }
+
+  @Test
+  def testListNodes(): Unit = {
+    client = AdminClient.create(createConfig())
+    val brokerStrs = brokerList.split(",").toList.sorted
+    var nodeStrs: List[String] = null
+    do {
+      val nodes = client.describeCluster().nodes().get().asScala
+      nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted
+    } while (nodeStrs.size < brokerStrs.size)
+    assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
+  }
+
+  @Test
+  def testCreateDeleteTopics(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topics = Seq("mytopic", "mytopic2")
+    val newTopics = topics.map(new NewTopic(_, 1, 1))
+    client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get()
+    waitForTopics(client, List(), topics)
+
+    client.createTopics(newTopics.asJava).all.get()
+    waitForTopics(client, topics, List())
+
+    val results = client.createTopics(newTopics.asJava).results()
+    assertTrue(results.containsKey("mytopic"))
+    assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
+    assertTrue(results.containsKey("mytopic2"))
+    assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
+    val topicsFromDescribe = client.describeTopics(topics.asJava).all.get().asScala.keys
+    assertEquals(topics.toSet, topicsFromDescribe)
+
+    client.deleteTopics(topics.asJava).all.get()
+    waitForTopics(client, List(), topics)
+  }
+
+  /**
+    * describe should not auto create topics
+    */
+  @Test
+  def testDescribeNonExistingTopic(): Unit = {
+    client = AdminClient.create(createConfig())
+
+    val existingTopic = "existing-topic"
+    client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1)).asJava).all.get()
+    waitForTopics(client, Seq(existingTopic), List())
+
+    val nonExistingTopic = "non-existing"
+    val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results
+    assertEquals(existingTopic, results.get(existingTopic).get.name)
+    intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
+    assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic))
+  }
+
+  @Test
+  def testGetAllBrokerVersionsAndDescribeCluster(): Unit = {
+    client = AdminClient.create(createConfig())
+    val nodes = client.describeCluster().nodes().get()
+    val clusterId = client.describeCluster().clusterId().get()
+    assertEquals(servers.head.apis.clusterId, clusterId)
+    val controller = client.describeCluster().controller().get()
+    assertEquals(servers.head.apis.metadataCache.getControllerId.
+      getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
+    val nodesToVersions = client.apiVersions(nodes).all().get()
+    val brokers = brokerList.split(",")
+    assert(brokers.size == nodesToVersions.size())
+    for ((node, brokerVersionInfo) <- nodesToVersions.asScala) {
+      val hostStr = s"${node.host}:${node.port}"
+      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+    }
+  }
+
+  @Test
+  def testDescribeAndAlterConfigs(): Unit = {
+    client = AdminClient.create(createConfig)
+
+    // Create topics
+    val topic1 = "describe-alter-configs-topic-1"
+    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+    val topicConfig1 = new Properties
+    topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
+    topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
+    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1)
+
+    val topic2 = "describe-alter-configs-topic-2"
+    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+    TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+    // Describe topics and broker
+    val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
+    val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString)
+    val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2)
+    var describeResult = client.describeConfigs(configResources.asJava)
+    var configs = describeResult.all.get
+
+    assertEquals(4, configs.size)
+
+    val maxMessageBytes1 = configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp)
+    assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes1.name)
+    assertEquals(topicConfig1.get(LogConfig.MaxMessageBytesProp), maxMessageBytes1.value)
+    assertFalse(maxMessageBytes1.isDefault)
+    assertFalse(maxMessageBytes1.isSensitive)
+    assertFalse(maxMessageBytes1.isReadOnly)
+
+    assertEquals(topicConfig1.get(LogConfig.RetentionMsProp),
+      configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
+
+    val maxMessageBytes2 = configs.get(topicResource2).get(LogConfig.MaxMessageBytesProp)
+    assertEquals(Defaults.MessageMaxBytes.toString, maxMessageBytes2.value)
+    assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes2.name)
+    assertTrue(maxMessageBytes2.isDefault)
+    assertFalse(maxMessageBytes2.isSensitive)
+    assertFalse(maxMessageBytes2.isReadOnly)
+
+    assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size)
+    assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
+    val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp)
+    assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value)
+    assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name)
+    assertFalse(listenerSecurityProtocolMap.isDefault)
+    assertFalse(listenerSecurityProtocolMap.isSensitive)
+    assertTrue(listenerSecurityProtocolMap.isReadOnly)
+    val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp)
+    assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name)
+    assertNull(truststorePassword.value)
+    assertFalse(truststorePassword.isDefault)
+    assertTrue(truststorePassword.isSensitive)
+    assertTrue(truststorePassword.isReadOnly)
+    val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
+    assertEquals(servers(1).config.compressionType.toString, compressionType.value)
+    assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
+    assertTrue(compressionType.isDefault)
+    assertFalse(compressionType.isSensitive)
+    assertTrue(compressionType.isReadOnly)
+
+    assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size)
+    assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
+    assertEquals(servers(2).config.logCleanerThreads.toString,
+      configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
+
+    checkValidAlterConfigs(zkUtils, servers, client, topicResource1, topicResource2)
+  }
+
+  @Test
+  def testInvalidAlterConfigs(): Unit = {
+    client = AdminClient.create(createConfig)
+    checkInvalidAlterConfigs(zkUtils, servers, client)
+  }
+
+  val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+
+  /**
+   * Test that ACL operations are not possible when the authorizer is disabled.
+   * Also see {@link kafka.api.SaslSslAdminClientIntegrationTest} for tests of ACL operations
+   * when the authorizer is enabled.
+   */
+  @Test
+  def testAclOperations(): Unit = {
+    client = AdminClient.create(createConfig())
+    assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException])
+    assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(),
+        classOf[SecurityDisabledException])
+    assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
+      classOf[SecurityDisabledException])
+    client.close()
+  }
+
+  /**
+    * Test closing the AdminClient with a generous timeout.  Calls in progress should be completed,
+    * since they can be done within the timeout.  New calls should receive timeouts.
+    */
+  @Test
+  def testDelayedClose(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topics = Seq("mytopic", "mytopic2")
+    val newTopics = topics.map(new NewTopic(_, 1, 1))
+    val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
+    client.close(2, TimeUnit.HOURS)
+    val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
+    assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
+    future.get
+    client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect
+  }
+
+  /**
+    * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long
+    * timeouts in progress.  The calls should be aborted after the hard shutdown timeout elapses.
+    */
+  @Test
+  def testForceClose(): Unit = {
+    val config = createConfig()
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+    client = AdminClient.create(config)
+    // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be
+    // cancelled by the close operation.
+    val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
+      new CreateTopicsOptions().timeoutMs(900000)).all()
+    client.close(0, TimeUnit.MILLISECONDS)
+    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+  }
+
+  /**
+    * Check that a call with a timeout does not complete before the minimum timeout has elapsed,
+    * even when the default request timeout is shorter.
+    */
+  @Test
+  def testMinimumRequestTimeouts(): Unit = {
+    val config = createConfig()
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+    config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
+    client = AdminClient.create(config)
+    val startTimeMs = Time.SYSTEM.milliseconds()
+    val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
+      new CreateTopicsOptions().timeoutMs(2)).all()
+    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+    val endTimeMs = Time.SYSTEM.milliseconds()
+    assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
+    client.close()
+  }
+
+  override def generateConfigs() = {
+    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+    cfgs.foreach { config =>
+      config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
+      config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
+      config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
+      config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
+      config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+      // We set this in order to test that we don't expose sensitive data via describe configs. This will already be
+      // set for subclasses with security enabled and we don't want to overwrite it.
+      if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
+        config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass")
+    }
+    cfgs.foreach(_.putAll(serverConfig))
+    cfgs.map(KafkaConfig.fromProps)
+  }
+}
+
+object AdminClientIntegrationTest {
+
+  import org.scalatest.Assertions._
+
+  def checkValidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient,
+                             topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
+    // Alter topics
+    var topicConfigEntries1 = Seq(
+      new ConfigEntry(LogConfig.FlushMsProp, "1000")
+    ).asJava
+
+    var topicConfigEntries2 = Seq(
+      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
+      new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
+    ).asJava
+
+    var alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2)
+    ).asJava)
+
+    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+    alterResult.all.get
+
+    // Verify that topics were updated correctly
+    var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
+    var configs = describeResult.all.get
+
+    assertEquals(2, configs.size)
+
+    assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value)
+    assertEquals(Defaults.MessageMaxBytes.toString,
+      configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
+    assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString,
+      configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
+
+    assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+    // Alter topics with validateOnly=true
+    topicConfigEntries1 = Seq(
+      new ConfigEntry(LogConfig.MaxMessageBytesProp, "10")
+    ).asJava
+
+    topicConfigEntries2 = Seq(
+      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
+    ).asJava
+
+    alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2)
+    ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+    alterResult.all.get
+
+    // Verify that topics were not updated due to validateOnly = true
+    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
+    configs = describeResult.all.get
+
+    assertEquals(2, configs.size)
+
+    assertEquals(Defaults.MessageMaxBytes.toString,
+      configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
+    assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+  }
+
+  def checkInvalidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient): Unit = {
+    // Create topics
+    val topic1 = "invalid-alter-configs-topic-1"
+    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
+
+    val topic2 = "invalid-alter-configs-topic-2"
+    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+    TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+    val topicConfigEntries1 = Seq(
+      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
+      new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
+    ).asJava
+
+    var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
+
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
+    val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
+
+    // Alter configs: first and third are invalid, second is valid
+    var alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2),
+      brokerResource -> new Config(brokerConfigEntries)
+    ).asJava)
+
+    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+    alterResult.results.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+    // Verify that first and third resources were not updated and second was updated
+    var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
+    var configs = describeResult.all.get
+    assertEquals(3, configs.size)
+
+    assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+      configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals(Defaults.CompressionType.toString,
+      configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
+
+    assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+    assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
+
+    // Alter configs with validateOnly = true: first and third are invalid, second is valid
+    topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava
+
+    alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2),
+      brokerResource -> new Config(brokerConfigEntries)
+    ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+    alterResult.results.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+    // Verify that no resources are updated since validate_only = true
+    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
+    configs = describeResult.all.get
+    assertEquals(3, configs.size)
+
+    assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+      configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals(Defaults.CompressionType.toString,
+      configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
+
+    assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+    assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ebe4838/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
new file mode 100644
index 0000000..7d3c54c
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -0,0 +1,209 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+
+package kafka.api
+
+import java.util
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig
+import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationException}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.policy.AlterConfigPolicy
+import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.{After, Before, Rule, Test}
+import org.junit.rules.Timeout
+
+import scala.collection.JavaConverters._
+
+/**
+  * Tests AdminClient calls when the broker is configured with policies like AlterConfigPolicy, CreateTopicPolicy, etc.
+  */
+class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with Logging {
+
+  import AdminClientWithPoliciesIntegrationTest._
+
+  var client: AdminClient = null
+  val brokerCount = 3
+
+  @Rule
+  def globalTimeout = Timeout.millis(120000)
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (client != null)
+      Utils.closeQuietly(client, "AdminClient")
+    super.tearDown()
+  }
+
+  def createConfig: util.Map[String, Object] =
+    Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList).asJava
+
+  override def generateConfigs = {
+    val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect)
+    configs.foreach(props => props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy]))
+    configs.map(KafkaConfig.fromProps)
+  }
+
+  @Test
+  def testValidAlterConfigs(): Unit = {
+    client = AdminClient.create(createConfig)
+    // Create topics
+    val topic1 = "describe-alter-configs-topic-1"
+    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+    val topicConfig1 = new Properties
+    topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
+    topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
+    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1)
+
+    val topic2 = "describe-alter-configs-topic-2"
+    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+    TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+    AdminClientIntegrationTest.checkValidAlterConfigs(zkUtils, servers, client, topicResource1, topicResource2)
+  }
+
+  @Test
+  def testInvalidAlterConfigs(): Unit = {
+    client = AdminClient.create(createConfig)
+    AdminClientIntegrationTest.checkInvalidAlterConfigs(zkUtils, servers, client)
+  }
+
+  @Test
+  def testInvalidAlterConfigsDueToPolicy(): Unit = {
+    client = AdminClient.create(createConfig)
+
+    // Create topics
+    val topic1 = "invalid-alter-configs-due-to-policy-topic-1"
+    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
+
+    val topic2 = "invalid-alter-configs-due-to-policy-topic-2"
+    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+    TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+    val topic3 = "invalid-alter-configs-due-to-policy-topic-3"
+    val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
+    TestUtils.createTopic(zkUtils, topic3, 1, 1, servers, new Properties)
+
+    val topicConfigEntries1 = Seq(
+      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
+      new ConfigEntry(LogConfig.MinInSyncReplicasProp, "2") // policy doesn't allow this
+    ).asJava
+
+    var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.8")).asJava
+
+    val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava
+
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
+    val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava
+
+    // Alter configs: second is valid, the others are invalid
+    var alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2),
+      topicResource3 -> new Config(topicConfigEntries3),
+      brokerResource -> new Config(brokerConfigEntries)
+    ).asJava)
+
+    assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+    alterResult.results.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+    // Verify that the second resource was updated and the others were not
+    var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
+    var configs = describeResult.all.get
+    assertEquals(4, configs.size)
+
+    assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+      configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals(Defaults.MinInSyncReplicas.toString,
+      configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
+
+    assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+
+    assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
+
+    // Alter configs with validateOnly = true: only second is valid
+    topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.7")).asJava
+
+    alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2),
+      brokerResource -> new Config(brokerConfigEntries),
+      topicResource3 -> new Config(topicConfigEntries3)
+    ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+    assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+    alterResult.results.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+    // Verify that no resources are updated since validate_only = true
+    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
+    configs = describeResult.all.get
+    assertEquals(4, configs.size)
+
+    assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+      configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals(Defaults.MinInSyncReplicas.toString,
+      configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
+
+    assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+
+    assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
+  }
+
+
+}
+
+object AdminClientWithPoliciesIntegrationTest {
+
+  class Policy extends AlterConfigPolicy {
+
+    var configs: Map[String, _] = _
+    var closed = false
+
+    def configure(configs: util.Map[String, _]): Unit = {
+      this.configs = configs.asScala.toMap
+    }
+
+    def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = {
+      require(!closed, "Policy should not be closed")
+      require(!configs.isEmpty, "configure should have been called with non empty configs")
+      require(!requestMetadata.configs.isEmpty, "request configs should not be empty")
+      require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty")
+      require(requestMetadata.resource.name.contains("topic"))
+      if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
+        throw new PolicyViolationException("Min in sync replicas cannot be updated")
+    }
+
+    def close(): Unit = closed = true
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ebe4838/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
deleted file mode 100644
index 0e21da7..0000000
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ /dev/null
@@ -1,475 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util
-import java.util.{Collections, Properties}
-import java.util.concurrent.{ExecutionException, TimeUnit}
-
-import org.apache.kafka.common.utils.{Time, Utils}
-import kafka.integration.KafkaServerTestHarness
-import kafka.log.LogConfig
-import kafka.server.{Defaults, KafkaConfig}
-import org.apache.kafka.clients.admin._
-import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.KafkaFuture
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.{After, Before, Rule, Test}
-import org.apache.kafka.common.requests.MetadataResponse
-import org.apache.kafka.common.resource.{Resource, ResourceType}
-import org.junit.rules.Timeout
-import org.junit.Assert._
-
-import scala.collection.JavaConverters._
-
-/**
- * An integration test of the KafkaAdminClient.
- *
- * Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
- */
-class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
-
-  @Rule
-  def globalTimeout = Timeout.millis(120000)
-
-  var client: AdminClient = null
-
-  @Before
-  override def setUp(): Unit = {
-    super.setUp
-    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
-  }
-
-  @After
-  override def tearDown(): Unit = {
-    if (client != null)
-      Utils.closeQuietly(client, "AdminClient")
-    super.tearDown()
-  }
-
-  val brokerCount = 3
-  lazy val serverConfig = new Properties
-
-  def createConfig(): util.Map[String, Object] = {
-    val config = new util.HashMap[String, Object]
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
-    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
-    config
-  }
-
-  def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
-    TestUtils.waitUntilTrue(() => {
-        val topics = client.listTopics.names.get()
-        expectedPresent.forall(topicName => topics.contains(topicName)) &&
-          expectedMissing.forall(topicName => !topics.contains(topicName))
-      }, "timed out waiting for topics")
-  }
-
-  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
-    try {
-      future.get()
-      fail("Expected CompletableFuture.get to return an exception")
-    } catch {
-      case e: ExecutionException =>
-        val cause = e.getCause()
-        assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
-            cause.getClass().getName, clazz.isInstance(cause))
-    }
-  }
-
-  @Test
-  def testClose(): Unit = {
-    val client = AdminClient.create(createConfig())
-    client.close()
-    client.close() // double close has no effect
-  }
-
-  @Test
-  def testListNodes(): Unit = {
-    client = AdminClient.create(createConfig())
-    val brokerStrs = brokerList.split(",").toList.sorted
-    var nodeStrs: List[String] = null
-    do {
-      val nodes = client.describeCluster().nodes().get().asScala
-      nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted
-    } while (nodeStrs.size < brokerStrs.size)
-    assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
-  }
-
-  @Test
-  def testCreateDeleteTopics(): Unit = {
-    client = AdminClient.create(createConfig())
-    val topics = Seq("mytopic", "mytopic2")
-    val newTopics = topics.map(new NewTopic(_, 1, 1))
-    client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get()
-    waitForTopics(client, List(), topics)
-
-    client.createTopics(newTopics.asJava).all.get()
-    waitForTopics(client, topics, List())
-
-    val results = client.createTopics(newTopics.asJava).results()
-    assertTrue(results.containsKey("mytopic"))
-    assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
-    assertTrue(results.containsKey("mytopic2"))
-    assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
-    val topicsFromDescribe = client.describeTopics(topics.asJava).all.get().asScala.keys
-    assertEquals(topics.toSet, topicsFromDescribe)
-
-    client.deleteTopics(topics.asJava).all.get()
-    waitForTopics(client, List(), topics)
-  }
-
-  /**
-    * describe should not auto create topics
-    */
-  @Test
-  def testDescribeNonExistingTopic(): Unit = {
-    client = AdminClient.create(createConfig())
-
-    val existingTopic = "existing-topic"
-    client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1)).asJava).all.get()
-    waitForTopics(client, Seq(existingTopic), List())
-
-    val nonExistingTopic = "non-existing"
-    val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results
-    assertEquals(existingTopic, results.get(existingTopic).get.name)
-    intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
-    assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic))
-  }
-
-  @Test
-  def testGetAllBrokerVersionsAndDescribeCluster(): Unit = {
-    client = AdminClient.create(createConfig())
-    val nodes = client.describeCluster().nodes().get()
-    val clusterId = client.describeCluster().clusterId().get()
-    assertEquals(servers.head.apis.clusterId, clusterId)
-    val controller = client.describeCluster().controller().get()
-    assertEquals(servers.head.apis.metadataCache.getControllerId.
-      getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
-    val nodesToVersions = client.apiVersions(nodes).all().get()
-    val brokers = brokerList.split(",")
-    assert(brokers.size == nodesToVersions.size())
-    for ((node, brokerVersionInfo) <- nodesToVersions.asScala) {
-      val hostStr = s"${node.host}:${node.port}"
-      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
-      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
-    }
-  }
-
-  @Test
-  def testDescribeAndAlterConfigs(): Unit = {
-    client = AdminClient.create(createConfig)
-
-    // Create topics
-    val topic1 = "describe-alter-configs-topic-1"
-    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
-    val topicConfig1 = new Properties
-    topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
-    topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
-    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1)
-
-    val topic2 = "describe-alter-configs-topic-2"
-    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
-
-    // Describe topics and broker
-    val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
-    val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString)
-    val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2)
-    var describeResult = client.describeConfigs(configResources.asJava)
-    var configs = describeResult.all.get
-
-    assertEquals(4, configs.size)
-
-    val maxMessageBytes1 = configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp)
-    assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes1.name)
-    assertEquals(topicConfig1.get(LogConfig.MaxMessageBytesProp), maxMessageBytes1.value)
-    assertFalse(maxMessageBytes1.isDefault)
-    assertFalse(maxMessageBytes1.isSensitive)
-    assertFalse(maxMessageBytes1.isReadOnly)
-
-    assertEquals(topicConfig1.get(LogConfig.RetentionMsProp),
-      configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
-
-    val maxMessageBytes2 = configs.get(topicResource2).get(LogConfig.MaxMessageBytesProp)
-    assertEquals(Defaults.MessageMaxBytes.toString, maxMessageBytes2.value)
-    assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes2.name)
-    assertTrue(maxMessageBytes2.isDefault)
-    assertFalse(maxMessageBytes2.isSensitive)
-    assertFalse(maxMessageBytes2.isReadOnly)
-
-    assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size)
-    assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
-    val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp)
-    assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value)
-    assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name)
-    assertFalse(listenerSecurityProtocolMap.isDefault)
-    assertFalse(listenerSecurityProtocolMap.isSensitive)
-    assertTrue(listenerSecurityProtocolMap.isReadOnly)
-    val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp)
-    assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name)
-    assertNull(truststorePassword.value)
-    assertFalse(truststorePassword.isDefault)
-    assertTrue(truststorePassword.isSensitive)
-    assertTrue(truststorePassword.isReadOnly)
-    val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
-    assertEquals(servers(1).config.compressionType.toString, compressionType.value)
-    assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
-    assertTrue(compressionType.isDefault)
-    assertFalse(compressionType.isSensitive)
-    assertTrue(compressionType.isReadOnly)
-
-    assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size)
-    assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
-    assertEquals(servers(2).config.logCleanerThreads.toString,
-      configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
-
-    // Alter topics
-    var topicConfigEntries1 = Seq(
-      new ConfigEntry(LogConfig.FlushMsProp, "1000")
-    ).asJava
-
-    var topicConfigEntries2 = Seq(
-      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
-      new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
-    ).asJava
-
-    var alterResult = client.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2)
-    ).asJava)
-
-    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
-    alterResult.all.get
-
-    // Verify that topics were updated correctly
-    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
-    configs = describeResult.all.get
-
-    assertEquals(2, configs.size)
-
-    assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value)
-    assertEquals(Defaults.MessageMaxBytes.toString,
-      configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
-    assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString,
-      configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
-
-    assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
-    assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
-
-    // Alter topics with validateOnly=true
-    topicConfigEntries1 = Seq(
-      new ConfigEntry(LogConfig.MaxMessageBytesProp, "10")
-    ).asJava
-
-    topicConfigEntries2 = Seq(
-      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
-    ).asJava
-
-    alterResult = client.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2)
-    ).asJava, new AlterConfigsOptions().validateOnly(true))
-
-    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
-    alterResult.all.get
-
-    // Verify that topics were not updated due to validateOnly = true
-    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
-    configs = describeResult.all.get
-
-    assertEquals(2, configs.size)
-
-    assertEquals(Defaults.MessageMaxBytes.toString,
-      configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
-    assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
-  }
-
-  @Test
-  def testInvalidAlterConfigs(): Unit = {
-    client = AdminClient.create(createConfig)
-
-    // Create topics
-    val topic1 = "invalid-alter-configs-topic-1"
-    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
-    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
-
-    val topic2 = "invalid-alter-configs-topic-2"
-    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
-
-    val topicConfigEntries1 = Seq(
-      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
-      new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
-    ).asJava
-
-    var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
-
-    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
-    val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
-
-    // Alter configs: first and third are invalid, second is valid
-    var alterResult = client.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2),
-      brokerResource -> new Config(brokerConfigEntries)
-    ).asJava)
-
-    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
-    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
-    alterResult.results.get(topicResource2).get
-    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
-
-    // Verify that first and third resources were not updated and second was updated
-    var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
-    var configs = describeResult.all.get
-    assertEquals(3, configs.size)
-
-    assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
-      configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
-    assertEquals(Defaults.CompressionType.toString,
-      configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
-
-    assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
-
-    assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
-
-    // Alter configs with validateOnly = true: first and third are invalid, second is valid
-    topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava
-
-    alterResult = client.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2),
-      brokerResource -> new Config(brokerConfigEntries)
-    ).asJava, new AlterConfigsOptions().validateOnly(true))
-
-    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
-    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
-    alterResult.results.get(topicResource2).get
-    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
-
-    // Verify that no resources are updated since validate_only = true
-    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
-    configs = describeResult.all.get
-    assertEquals(3, configs.size)
-
-    assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
-      configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
-    assertEquals(Defaults.CompressionType.toString,
-      configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
-
-    assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
-
-    assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
-  }
-
-  val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
-      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
-
-  /**
-   * Test that ACL operations are not possible when the authorizer is disabled.
-   * Also see {@link kafka.api.SaslSslAdminClientIntegrationTest} for tests of ACL operations
-   * when the authorizer is enabled.
-   */
-  @Test
-  def testAclOperations(): Unit = {
-    client = AdminClient.create(createConfig())
-    assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException])
-    assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(),
-        classOf[SecurityDisabledException])
-    assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
-      classOf[SecurityDisabledException])
-    client.close()
-  }
-
-  /**
-    * Test closing the AdminClient with a generous timeout.  Calls in progress should be completed,
-    * since they can be done within the timeout.  New calls should receive timeouts.
-    */
-  @Test
-  def testDelayedClose(): Unit = {
-    client = AdminClient.create(createConfig())
-    val topics = Seq("mytopic", "mytopic2")
-    val newTopics = topics.map(new NewTopic(_, 1, 1))
-    val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
-    client.close(2, TimeUnit.HOURS)
-    val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
-    assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
-    future.get
-    client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect
-  }
-
-  /**
-    * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long
-    * timeouts in progress.  The calls should be aborted after the hard shutdown timeout elapses.
-    */
-  @Test
-  def testForceClose(): Unit = {
-    val config = createConfig()
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
-    client = AdminClient.create(config)
-    // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be
-    // cancelled by the close operation.
-    val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
-      new CreateTopicsOptions().timeoutMs(900000)).all()
-    client.close(0, TimeUnit.MILLISECONDS)
-    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
-  }
-
-  /**
-    * Check that a call with a timeout does not complete before the minimum timeout has elapsed,
-    * even when the default request timeout is shorter.
-    */
-  @Test
-  def testMinimumRequestTimeouts(): Unit = {
-    val config = createConfig()
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
-    config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
-    client = AdminClient.create(config)
-    val startTimeMs = Time.SYSTEM.milliseconds()
-    val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
-      new CreateTopicsOptions().timeoutMs(2)).all()
-    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
-    val endTimeMs = Time.SYSTEM.milliseconds()
-    assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
-    client.close()
-  }
-
-  override def generateConfigs() = {
-    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
-    cfgs.foreach { config =>
-      config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
-      config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
-      config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
-      config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
-      config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
-      // We set this in order to test that we don't expose sensitive data via describe configs. This will already be
-      // set for subclasses with security enabled and we don't want to overwrite it.
-      if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
-        config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass")
-    }
-    cfgs.foreach(_.putAll(serverConfig))
-    cfgs.map(KafkaConfig.fromProps)
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ebe4838/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index d27b0bf..9cd86c3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -27,7 +27,7 @@ import org.junit.{After, Assert, Before, Test}
 
 import scala.collection.JavaConverters._
 
-class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslSetup {
+class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup {
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName())
   this.serverConfig.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")


Mime
View raw message