kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/3] kafka git commit: KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)
Date Thu, 18 May 2017 05:51:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e1abf1770 -> 972b75453


http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 9eb1275..a362577 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
Produce
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType,
_}
 import CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{Node, TopicPartition, requests}
@@ -40,6 +40,7 @@ import scala.collection.mutable
 import scala.collection.mutable.Buffer
 import org.apache.kafka.common.KafkaException
 import kafka.admin.AdminUtils
+import kafka.log.LogConfig
 import kafka.network.SocketServer
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
@@ -65,13 +66,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val deleteTopicResource = new Resource(Topic, deleteTopic)
   val producerTransactionalIdResource = new Resource(ProducerTransactionalId, transactionalId)
 
-  val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)))
-  val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, ClusterAction)))
-  val ClusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, Create)))
-  val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)))
-  val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Write)))
-  val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Describe)))
-  val TopicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, Delete)))
+  val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)))
+  val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, ClusterAction)))
+  val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, Create)))
+  val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)))
+  val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Write)))
+  val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Describe)))
+  val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, Delete)))
+  val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, DescribeConfigs)))
+  val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, AlterConfigs)))
   val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> Set(new
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
 
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
@@ -92,7 +95,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
   }
 
-  val RequestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
+  val requestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
     Map(ApiKeys.METADATA -> classOf[requests.MetadataResponse],
       ApiKeys.PRODUCE -> classOf[requests.ProduceResponse],
       ApiKeys.FETCH -> classOf[requests.FetchResponse],
@@ -110,15 +113,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse],
       ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse],
       ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse],
-      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse]
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse],
+      ApiKeys.DESCRIBE_CONFIGS -> classOf[DescribeConfigsResponse],
+      ApiKeys.ALTER_CONFIGS -> classOf[AlterConfigsResponse]
   )
 
-  val RequestKeyToError = Map[ApiKeys, (Nothing) => Errors](
-    ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1
== topic).getOrElse(("test", Errors.NONE))._2),
-    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1
== tp).get._2.error),
-    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1
== tp).get._2.error),
-    ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1
== tp).get._2.error),
-    ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1
== tp).get._2),
+  val requestKeyToError = Map[ApiKeys, Nothing => Errors](
+    ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1
== topic).getOrElse(("test", Errors.NONE))._2),
+    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1
== tp).get._2.error),
+    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData.asScala.find(_._1
== tp).get._2.error),
+    ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1
== tp).get._2.error),
+    ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1
== tp).get._2),
     ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
     ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
     ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error),
@@ -126,33 +131,39 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
     ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
     ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
-    ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1
== tp).get._2),
-    ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1
== tp).get._2),
+    ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1
== tp).get._2),
+    ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1
== tp).get._2),
     ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) =>
resp.error),
-    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1
== createTopic).get._2.error),
-    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1
== deleteTopic).get._2),
-    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.asScala.get(tp).get.error())
+    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors.asScala.find(_._1
== createTopic).get._2.error),
+    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1
== deleteTopic).get._2),
+    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error),
+    ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) =>
+      resp.configs.get(new RResource(RResourceType.TOPIC, tp.topic)).error.error),
+    ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) =>
+      resp.errors.get(new RResource(RResourceType.TOPIC, tp.topic)).error)
   )
 
-  val RequestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
-    ApiKeys.METADATA -> TopicDescribeAcl,
-    ApiKeys.PRODUCE -> TopicWriteAcl,
-    ApiKeys.FETCH -> TopicReadAcl,
-    ApiKeys.LIST_OFFSETS -> TopicDescribeAcl,
-    ApiKeys.OFFSET_COMMIT -> (TopicReadAcl ++ GroupReadAcl),
-    ApiKeys.OFFSET_FETCH -> (TopicReadAcl ++ GroupReadAcl),
-    ApiKeys.FIND_COORDINATOR -> (TopicReadAcl ++ GroupReadAcl),
-    ApiKeys.UPDATE_METADATA_KEY -> ClusterAcl,
-    ApiKeys.JOIN_GROUP -> GroupReadAcl,
-    ApiKeys.SYNC_GROUP -> GroupReadAcl,
-    ApiKeys.HEARTBEAT -> GroupReadAcl,
-    ApiKeys.LEAVE_GROUP -> GroupReadAcl,
-    ApiKeys.LEADER_AND_ISR -> ClusterAcl,
-    ApiKeys.STOP_REPLICA -> ClusterAcl,
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ClusterAcl,
-    ApiKeys.CREATE_TOPICS -> ClusterCreateAcl,
-    ApiKeys.DELETE_TOPICS -> TopicDeleteAcl,
-    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ClusterAcl
+  val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
+    ApiKeys.METADATA -> topicDescribeAcl,
+    ApiKeys.PRODUCE -> topicWriteAcl,
+    ApiKeys.FETCH -> topicReadAcl,
+    ApiKeys.LIST_OFFSETS -> topicDescribeAcl,
+    ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl),
+    ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupReadAcl),
+    ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupReadAcl),
+    ApiKeys.UPDATE_METADATA_KEY -> clusterAcl,
+    ApiKeys.JOIN_GROUP -> groupReadAcl,
+    ApiKeys.SYNC_GROUP -> groupReadAcl,
+    ApiKeys.HEARTBEAT -> groupReadAcl,
+    ApiKeys.LEAVE_GROUP -> groupReadAcl,
+    ApiKeys.LEADER_AND_ISR -> clusterAcl,
+    ApiKeys.STOP_REPLICA -> clusterAcl,
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY -> clusterAcl,
+    ApiKeys.CREATE_TOPICS -> clusterCreateAcl,
+    ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
+    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
+    ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl,
+    ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl
   )
 
   @Before
@@ -221,8 +232,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def offsetsForLeaderEpochRequest = {
-  new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build()
-}
+    new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build()
+  }
 
   private def createOffsetFetchRequest = {
     new requests.OffsetFetchRequest.Builder(group, List(tp).asJava).build()
@@ -289,6 +300,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
   }
 
+  private def createDescribeConfigsRequest =
+    new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC,
tp.topic))).build()
+
+  private def createAlterConfigsRequest =
+    new AlterConfigsRequest.Builder(
+      Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
+        new AlterConfigsRequest.Config(Collections.singleton(
+          new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
+        ))), true).build()
+
+
   @Test
   def testAuthorizationWithTopicExisting() {
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@@ -309,17 +331,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest,
       ApiKeys.CREATE_TOPICS -> createTopicsRequest,
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
-      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
+      ApiKeys.DESCRIBE_CONFIGS -> createDescribeConfigsRequest,
+      ApiKeys.ALTER_CONFIGS -> createAlterConfigsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls
-      val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
+      val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
       sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe
= false)
 
-      val resourceToAcls = RequestKeysToAcls(key)
+      val resourceToAcls = requestKeysToAcls(key)
       resourceToAcls.get(topicResource).map { acls =>
-        val describeAcls = TopicDescribeAcl(topicResource)
+        val describeAcls = topicDescribeAcl(topicResource)
         val isAuthorized =  describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
         sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized,
isAuthorizedTopicDescribe = true)
@@ -353,12 +377,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls
-      val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
+      val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
       sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe
= false, topicExists = false)
 
-      val resourceToAcls = RequestKeysToAcls(key)
+      val resourceToAcls = requestKeysToAcls(key)
       resourceToAcls.get(topicResource).map { acls =>
-        val describeAcls = TopicDescribeAcl(topicResource)
+        val describeAcls = topicDescribeAcl(topicResource)
         val isAuthorized =  describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
         sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized,
isAuthorizedTopicDescribe = true, topicExists = false)
@@ -427,7 +451,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     sendRecords(numRecords, topicPartition)
   }
 
-  @Test(expected = classOf[AuthorizationException])
+  @Test(expected = classOf[GroupAuthorizationException])
   def testConsumeWithNoAccess(): Unit = {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
     sendRecords(1, tp)
@@ -666,8 +690,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val topicPartition = new TopicPartition(newTopic, 0)
     val newTopicResource = new Resource(Topic, newTopic)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
newTopicResource)
-    addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
-    addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
+    addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
+    addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource)
     try {
       this.consumers.head.assign(List(topicPartition).asJava)
       consumeRecords(this.consumers.head)
@@ -933,9 +957,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
                                         isAuthorizedTopicDescribe: Boolean,
                                         topicExists: Boolean = true): AbstractResponse =
{
     val resp = connectAndSend(request, apiKey)
-    val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer],
classOf[Short]).invoke(
+    val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer],
classOf[Short]).invoke(
       null, resp, request.version: java.lang.Short).asInstanceOf[AbstractResponse]
-    val error = RequestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
+    val error = requestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
 
     val authorizationErrors = resources.flatMap { resourceType =>
       if (resourceType == Topic) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index f381b15..81f5c27 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -22,12 +22,13 @@ import java.util.concurrent.ExecutionException
 
 import org.apache.kafka.common.utils.Utils
 import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
+import kafka.log.LogConfig
+import kafka.server.{Defaults, KafkaConfig}
 import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.KafkaFuture
-import org.apache.kafka.common.errors.{SecurityDisabledException, TopicExistsException}
+import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException,
TopicExistsException}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.MetadataResponse
@@ -104,13 +105,12 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness
with Loggin
   def testListNodes(): Unit = {
     client = AdminClient.create(createConfig())
     val brokerStrs = brokerList.split(",").toList.sorted
-    var nodeStrs : List[String] = null
+    var nodeStrs: List[String] = null
     do {
-      var nodes = client.describeCluster().nodes().get().asScala
+      val nodes = client.describeCluster().nodes().get().asScala
       nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted
     } while (nodeStrs.size < brokerStrs.size)
     assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
-    client.close()
   }
 
   @Test
@@ -153,7 +153,213 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness
with Loggin
       assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
       assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
     }
-    client.close()
+  }
+
+  @Test
+  def testDescribeAndAlterConfigs(): Unit = {
+    client = AdminClient.create(createConfig)
+
+    // Create topics
+    val topic1 = "describe-alter-configs-topic-1"
+    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+    val topicConfig1 = new Properties
+    topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
+    topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
+    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1)
+
+    val topic2 = "describe-alter-configs-topic-2"
+    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+    TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+    // Describe topics and broker
+    val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
+    val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString)
+    val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2)
+    var describeResult = client.describeConfigs(configResources.asJava)
+    var configs = describeResult.all.get
+
+    assertEquals(4, configs.size)
+
+    val maxMessageBytes1 = configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp)
+    assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes1.name)
+    assertEquals(topicConfig1.get(LogConfig.MaxMessageBytesProp), maxMessageBytes1.value)
+    assertFalse(maxMessageBytes1.isDefault)
+    assertFalse(maxMessageBytes1.isSensitive)
+    assertFalse(maxMessageBytes1.isReadOnly)
+
+    assertEquals(topicConfig1.get(LogConfig.RetentionMsProp),
+      configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
+
+    val maxMessageBytes2 = configs.get(topicResource2).get(LogConfig.MaxMessageBytesProp)
+    assertEquals(Defaults.MessageMaxBytes.toString, maxMessageBytes2.value)
+    assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes2.name)
+    assertTrue(maxMessageBytes2.isDefault)
+    assertFalse(maxMessageBytes2.isSensitive)
+    assertFalse(maxMessageBytes2.isReadOnly)
+
+    assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size)
+    assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
+    val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp)
+    assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp),
listenerSecurityProtocolMap.value)
+    assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name)
+    assertFalse(listenerSecurityProtocolMap.isDefault)
+    assertFalse(listenerSecurityProtocolMap.isSensitive)
+    assertTrue(listenerSecurityProtocolMap.isReadOnly)
+    val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp)
+    assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name)
+    assertNull(truststorePassword.value)
+    assertFalse(truststorePassword.isDefault)
+    assertTrue(truststorePassword.isSensitive)
+    assertTrue(truststorePassword.isReadOnly)
+    val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
+    assertEquals(servers(1).config.compressionType.toString, compressionType.value)
+    assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
+    assertTrue(compressionType.isDefault)
+    assertFalse(compressionType.isSensitive)
+    assertTrue(compressionType.isReadOnly)
+
+    assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size)
+    assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
+    assertEquals(servers(2).config.logCleanerThreads.toString,
+      configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
+
+    // Alter topics
+    var topicConfigEntries1 = Seq(
+      new ConfigEntry(LogConfig.FlushMsProp, "1000")
+    ).asJava
+
+    var topicConfigEntries2 = Seq(
+      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
+      new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
+    ).asJava
+
+    var alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2)
+    ).asJava)
+
+    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+    alterResult.all.get
+
+    // Verify that topics were updated correctly
+    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
+    configs = describeResult.all.get
+
+    assertEquals(2, configs.size)
+
+    assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value)
+    assertEquals(Defaults.MessageMaxBytes.toString,
+      configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
+    assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString,
+      configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
+
+    assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+    // Alter topics with validateOnly=true
+    topicConfigEntries1 = Seq(
+      new ConfigEntry(LogConfig.MaxMessageBytesProp, "10")
+    ).asJava
+
+    topicConfigEntries2 = Seq(
+      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
+    ).asJava
+
+    alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2)
+    ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+    alterResult.all.get
+
+    // Verify that topics were not updated due to validateOnly = true
+    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
+    configs = describeResult.all.get
+
+    assertEquals(2, configs.size)
+
+    assertEquals(Defaults.MessageMaxBytes.toString,
+      configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
+    assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+  }
+
+  @Test
+  def testInvalidAlterConfigs(): Unit = {
+    client = AdminClient.create(createConfig)
+
+    // Create topics
+    val topic1 = "invalid-alter-configs-topic-1"
+    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
+
+    val topic2 = "invalid-alter-configs-topic-2"
+    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+    TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+    val topicConfigEntries1 = Seq(
+      new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid
as it's above 1.0
+      new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
+    ).asJava
+
+    var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
+
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
+    val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
+
+    // Alter configs: first and third are invalid, second is valid
+    var alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2),
+      brokerResource -> new Config(brokerConfigEntries)
+    ).asJava)
+
+    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+    alterResult.results.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+    // Verify that first and third resources were not updated and second was updated
+    var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
+    var configs = describeResult.all.get
+    assertEquals(3, configs.size)
+
+    assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+      configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals(Defaults.CompressionType.toString,
+      configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
+
+    assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+    assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
+
+    // Alter configs with validateOnly = true: first and third are invalid, second is valid
+    topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava
+
+    alterResult = client.alterConfigs(Map(
+      topicResource1 -> new Config(topicConfigEntries1),
+      topicResource2 -> new Config(topicConfigEntries2),
+      brokerResource -> new Config(brokerConfigEntries)
+    ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+    alterResult.results.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+    // Verify that no resources are updated since validate_only = true
+    describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
+    configs = describeResult.all.get
+    assertEquals(3, configs.size)
+
+    assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+      configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals(Defaults.CompressionType.toString,
+      configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
+
+    assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+    assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
   }
 
   val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
@@ -183,7 +389,11 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness
with Loggin
       config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
       config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
       config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
-      config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true");
+      config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+      // We set this in order to test that we don't expose sensitive data via describe configs.
This will already be
+      // set for subclasses with security enabled and we don't want to overwrite it.
+      if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
+        config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass")
     }
     cfgs.foreach(_.putAll(serverConfig))
     cfgs.map(KafkaConfig.fromProps)

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 6d89d4f..f5b0a06 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -35,17 +35,22 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
   private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
   private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group,
"testGroup-2"))
+  private val BrokerResources = Set(new Resource(Broker, "0"), new Resource(Broker, "1"))
 
   private val ResourceToCommand = Map[Set[Resource], Array[String]](
     TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
     Set(Resource.ClusterResource) -> Array("--cluster"),
-    GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2")
+    GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2"),
+    BrokerResources -> Array("--broker", "0", "--broker", "1")
   )
 
   private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
-    TopicResources -> (Set(Read, Write, Describe, Delete), Array("--operation", "Read"
, "--operation", "Write", "--operation", "Describe", "--operation", "Delete")),
+    TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs),
+      Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation",
"Delete",
+        "--operation", "DescribeConfigs", "--operation", "AlterConfigs")),
     Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operation",
"Create", "--operation", "ClusterAction")),
-    GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation",
"Describe"))
+    GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation",
"Describe")),
+    BrokerResources -> (Set(DescribeConfigs), Array("--operation", "DescribeConfigs"))
   )
 
   private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]](

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index b82ddf9..8e6b11d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -23,7 +23,7 @@ import kafka.network.SocketServer
 import kafka.utils.TestUtils
 import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, MetadataRequest,
MetadataResponse}
+import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse,
MetadataRequest, MetadataResponse}
 import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
 
 import scala.collection.JavaConverters._
@@ -79,8 +79,8 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
     }
   }
 
-  protected def error(error: Errors, errorMessage: Option[String] = None): CreateTopicsResponse.Error
=
-    new CreateTopicsResponse.Error(error, errorMessage.orNull)
+  protected def error(error: Errors, errorMessage: Option[String] = None): ApiError =
+    new ApiError(error, errorMessage.orNull)
 
   protected def toStructWithDuplicateFirstTopic(request: CreateTopicsRequest): Struct = {
     val struct = request.toStruct
@@ -101,7 +101,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
   }
 
   protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest,
-                                                  expectedResponse: Map[String, CreateTopicsResponse.Error],
+                                                  expectedResponse: Map[String, ApiError],
                                                   checkErrorMessage: Boolean = true,
                                                   requestStruct: Option[Struct] = None):
Unit = {
     val response = requestStruct.map(sendCreateTopicRequestStruct(_, request.version)).getOrElse(

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 5cb6f71..7e50049 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -20,6 +20,7 @@ import java.util.{Collections, LinkedHashMap, Properties}
 import java.util.concurrent.{Executors, Future, TimeUnit}
 
 import kafka.admin.AdminUtils
+import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
@@ -30,9 +31,8 @@ import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLa
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType,
_}
 import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, KafkaPrincipal}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -277,7 +277,17 @@ class RequestQuotaTest extends BaseRequestTest {
             new ResourceFilter(AdminResourceType.TOPIC, null),
             new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY))))
 
-        case key =>
+        case ApiKeys.DESCRIBE_CONFIGS =>
+          new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC,
tp.topic)))
+
+        case ApiKeys.ALTER_CONFIGS =>
+          new AlterConfigsRequest.Builder(
+            Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
+              new AlterConfigsRequest.Config(Collections.singleton(
+                new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
+              ))), true)
+
+        case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
   }
@@ -366,6 +376,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.DESCRIBE_ACLS => new DescribeAclsResponse(response).throttleTimeMs
       case ApiKeys.CREATE_ACLS => new CreateAclsResponse(response).throttleTimeMs
       case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs
+      case ApiKeys.DESCRIBE_CONFIGS => new DescribeConfigsResponse(response).throttleTimeMs
+      case ApiKeys.ALTER_CONFIGS => new AlterConfigsResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 96a56b4..47ca8ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
@@ -182,7 +183,7 @@ public class StreamsKafkaClient {
         final CreateTopicsResponse createTopicsResponse =  (CreateTopicsResponse) clientResponse.responseBody();
 
         for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
-            CreateTopicsResponse.Error error = createTopicsResponse.errors().get(internalTopicConfig.name());
+            ApiError error = createTopicsResponse.errors().get(internalTopicConfig.name());
             if (!error.is(Errors.NONE) && !error.is(Errors.TOPIC_ALREADY_EXISTS))
{
                 throw new StreamsException("Could not create topic: " + internalTopicConfig.name()
+ " due to " + error.messageWithFallback());
             }


Mime
View raw message