kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/3] kafka git commit: KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)
Date Thu, 18 May 2017 02:31:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 249152062 -> 9815e18fe


http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c3c37c1..bf7d4c1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,15 +43,20 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
TRANS
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords,
RecordBatch}
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
+import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.clients.admin.{AccessControlEntry, AclBinding, AclBindingFilter,
AclOperation, AclPermissionType, Resource => AdminResource, ResourceType => AdminResourceType}
 
 import scala.collection._
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
 /**
@@ -118,6 +123,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.END_TXN => handleEndTxnRequest(request)
         case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
         case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
+        case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
+        case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
+        case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1655,6 +1663,217 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleDescribeAcls(request: RequestChannel.Request): Unit = {
+    authorizeClusterAction(request)
+    val describeAclsRequest = request.body[DescribeAclsRequest]
+    authorizer match {
+      case None =>
+        def createResponse(throttleTimeMs: Int): AbstractResponse =
+          new DescribeAclsResponse(throttleTimeMs, new SecurityDisabledException(
+            "No Authorizer is configured on the broker."), Collections.emptySet[AclBinding]);
+        sendResponseMaybeThrottle(request, createResponse)
+      case Some(auth) =>
+        val filter = describeAclsRequest.filter()
+        var returnedAcls = new util.ArrayList[AclBinding]
+        val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
+        aclMap.foreach {
+          case (resource, acls) => {
+            acls.foreach {
+              case (acl) => {
+                val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString),
resource.name),
+                    new AccessControlEntry(acl.principal.toString(), acl.host.toString(),
acl.operation.toJava, acl.permissionType.toJava))
+                if (filter.matches(fixture))
+                  returnedAcls.add(fixture)
+              }
+            }
+          }
+        }
+        def createResponse(throttleTimeMs: Int): AbstractResponse =
+          new DescribeAclsResponse(throttleTimeMs, null, returnedAcls)
+        sendResponseMaybeThrottle(request, createResponse)
+    }
+  }
+
+  /**
+    * Convert an ACL binding filter to a Scala object.
+    * All ACL and resource fields must be specified (no UNKNOWN, ANY, or null fields are
allowed.)
+    *
+    * @param filter     The binding filter as a Java object.
+    * @return           The binding filter as a scala object, or an exception if there was
an error
+    *                   converting the Java object.
+    */
+  def toScala(filter: AclBindingFilter) : Try[(Resource, Acl)] = {
+    filter.resourceFilter().resourceType() match {
+      case AdminResourceType.UNKNOWN => return Failure(new InvalidRequestException("Invalid
UNKNOWN resource type"))
+      case AdminResourceType.ANY => return Failure(new InvalidRequestException("Invalid
ANY resource type"))
+      case _ => {}
+    }
+    var resourceType: ResourceType = null
+    try {
+      resourceType = ResourceType.fromString(filter.resourceFilter().resourceType().toString)
+    } catch {
+      case throwable: Throwable => return Failure(new InvalidRequestException("Invalid
resource type"))
+    }
+    var principal: KafkaPrincipal = null
+    try {
+      principal = KafkaPrincipal.fromString(filter.entryFilter().principal())
+    } catch {
+      case throwable: Throwable => return Failure(new InvalidRequestException("Invalid
principal"))
+    }
+    filter.entryFilter().operation() match {
+      case AclOperation.UNKNOWN => return Failure(new InvalidRequestException("Invalid
UNKNOWN operation type"))
+      case AclOperation.ANY => return Failure(new InvalidRequestException("Invalid ANY
operation type"))
+      case _ => {}
+    }
+    val operation = Operation.fromJava(filter.entryFilter().operation()) match {
+      case Failure(throwable) => return Failure(new InvalidRequestException(throwable.getMessage))
+      case Success(op) => op
+    }
+    filter.entryFilter().permissionType() match {
+      case AclPermissionType.UNKNOWN => new InvalidRequestException("Invalid UNKNOWN permission
type")
+      case AclPermissionType.ANY => new InvalidRequestException("Invalid ANY permission
type")
+      case _ => {}
+    }
+    val permissionType = PermissionType.fromJava(filter.entryFilter.permissionType) match
{
+      case Failure(throwable) => return Failure(new InvalidRequestException(throwable.getMessage))
+      case Success(perm) => perm
+    }
+    return Success((Resource(resourceType, filter.resourceFilter().name()), Acl(principal,
permissionType,
+                   filter.entryFilter().host(), operation)))
+  }
+
+  /**
+    * Convert a Scala ACL binding to a Java object.
+    *
+    * @param acl        The binding as a Scala object.
+    * @return           The binding as a Java object.
+    */
+  def toJava(acl: (Resource, Acl)) : AclBinding = {
+    acl match {
+      case (resource, acl) =>
+        val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString),
resource.name)
+        val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
+                                  acl.operation.toJava, acl.permissionType.toJava)
+        return new AclBinding(adminResource, entry)
+    }
+  }
+
+  def handleCreateAcls(request: RequestChannel.Request): Unit = {
+    authorizeClusterAction(request)
+    val createAclsRequest = request.body[CreateAclsRequest]
+    authorizer match {
+      case None =>
+        def createResponse(throttleTimeMs: Int): AbstractResponse =
+          createAclsRequest.getErrorResponse(throttleTimeMs,
+            new SecurityDisabledException("No Authorizer is configured on the broker."))
+        sendResponseMaybeThrottle(request, createResponse)
+      case Some(auth) =>
+        val errors = mutable.HashMap[Int, Throwable]()
+        var creations = ListBuffer[(Resource, Acl)]()
+        for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
+          val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter)
+          result match {
+            case Failure(throwable) => errors.put(i, throwable)
+            case Success((resource, acl)) => try {
+                if (resource.resourceType.equals(Cluster) &&
+                    !resource.name.equals(Resource.ClusterResourceName))
+                  throw new InvalidRequestException("The only valid name for the CLUSTER
resource is " +
+                      Resource.ClusterResourceName)
+                if (resource.name.isEmpty())
+                  throw new InvalidRequestException("Invalid empty resource name")
+                auth.addAcls(immutable.Set(acl), resource)
+              } catch {
+                case throwable : Throwable => errors.put(i, throwable)
+              }
+          }
+        }
+        var aclCreationResults = new java.util.ArrayList[AclCreationResponse]
+        for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
+          errors.get(i) match {
+            case Some(throwable) => aclCreationResults.add(new AclCreationResponse(throwable))
+            case None => aclCreationResults.add(new AclCreationResponse(null))
+          }
+        }
+        def createResponse(throttleTimeMs: Int): AbstractResponse =
+          new CreateAclsResponse(throttleTimeMs, aclCreationResults)
+        sendResponseMaybeThrottle(request, createResponse)
+    }
+  }
+
+  def handleDeleteAcls(request: RequestChannel.Request): Unit = {
+    authorizeClusterAction(request)
+    val deleteAclsRequest = request.body[DeleteAclsRequest]
+    authorizer match {
+      case None =>
+        def createResponse(throttleTimeMs: Int): AbstractResponse =
+          deleteAclsRequest.getErrorResponse(throttleTimeMs,
+            new SecurityDisabledException("No Authorizer is configured on the broker."))
+        sendResponseMaybeThrottle(request, createResponse)
+      case Some(auth) =>
+        val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]()
+        var toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]()
+        for (i <- 0 to deleteAclsRequest.filters().size - 1) {
+          toDelete.put(i, new ListBuffer[(Resource, Acl)]())
+        }
+        if (deleteAclsRequest.filters().asScala.exists { f => !f.matchesAtMostOne() })
{
+          // Delete based on filters that may match more than one ACL.
+          val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
+          aclMap.foreach {
+            case (resource, acls) => {
+              acls.foreach {
+                case (acl) => {
+                  val binding = new AclBinding(new AdminResource(AdminResourceType.
+                      fromString(resource.resourceType.toString), resource.name),
+                    new AccessControlEntry(acl.principal.toString(), acl.host.toString(),
+                      acl.operation.toJava, acl.permissionType.toJava))
+                  for (i <- 0 to deleteAclsRequest.filters().size - 1) {
+                    val filter = deleteAclsRequest.filters().get(i)
+                    if (filter.matches(binding)) {
+                      toDelete.get(i).get += ((resource, acl))
+                    }
+                  }
+                }
+              }
+            }
+          }
+        } else {
+          // Delete based on a list of ACL fixtures.
+          for (i <- 0 to deleteAclsRequest.filters().size - 1) {
+            toScala(deleteAclsRequest.filters().get(i)) match {
+              case Failure(throwable) => filterResponseMap.put(i,
+                new AclFilterResponse(throwable, Collections.emptySet[AclDeletionResult]()))
+              case Success(fixture) => toDelete.put(i, ListBuffer(fixture))
+            }
+          }
+        }
+        for (i <- toDelete.keys) {
+          val deletionResults = new util.ArrayList[AclDeletionResult]()
+          for (acls <- toDelete.get(i)) {
+            for ((resource, acl) <- acls) {
+              try {
+                if (auth.removeAcls(immutable.Set(acl), resource)) {
+                  deletionResults.add(new AclDeletionResult(null, toJava((resource, acl))))
+                }
+              } catch {
+                case throwable: Throwable => deletionResults.add(new AclDeletionResult(
+                  new UnknownServerException("Failed to delete ACL: " + throwable.toString),
+                    toJava((resource, acl))))
+              }
+            }
+          }
+          filterResponseMap.put(i, new AclFilterResponse(null, deletionResults))
+        }
+        val filterResponses = new util.ArrayList[AclFilterResponse]
+        for (i <- 0 to deleteAclsRequest.filters().size() - 1) {
+          filterResponses.add(filterResponseMap.getOrElse(i,
+            new AclFilterResponse(null, new util.ArrayList[AclDeletionResult]())))
+        }
+        def createResponse(throttleTimeMs: Int): AbstractResponse =
+          new DeleteAclsResponse(throttleTimeMs, filterResponses)
+        sendResponseMaybeThrottle(request, createResponse)
+    }
+  }
+
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
     val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
     val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index ed513ea..f381b15 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -17,7 +17,7 @@
 package kafka.api
 
 import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.concurrent.ExecutionException
 
 import org.apache.kafka.common.utils.Utils
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.KafkaFuture
-import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.errors.{SecurityDisabledException, TopicExistsException}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.MetadataResponse
@@ -156,6 +156,25 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness
with Loggin
     client.close()
   }
 
+  val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
+
+  /**
+   * Test that ACL operations are not possible when the authorizer is disabled.
+   * Also see {@link kafka.api.SaslSslAdminClientIntegrationTest} for tests of ACL operations
+   * when the authorizer is enabled.
+   */
+  @Test
+  def testAclOperations(): Unit = {
+    client = AdminClient.create(createConfig())
+    assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException])
+    assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(),
+        classOf[SecurityDisabledException])
+    assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
+      classOf[SecurityDisabledException])
+    client.close()
+  }
+
   override def generateConfigs() = {
     val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol
= Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 1bfcdf2..cb43b09 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -14,13 +14,22 @@ package kafka.api
 
 import java.io.File
 
+import kafka.security.auth.SimpleAclAuthorizer
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
-import kafka.utils.JaasTestUtils
-import org.junit.{After, Before}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.{AccessControlEntry, AccessControlEntryFilter, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType, AdminClient, CreateAclsOptions, DeleteAclsOptions,
Resource, ResourceFilter, ResourceType}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.junit.Assert.assertEquals
+import org.junit.{After, Assert, Before, Test}
+
+import scala.collection.JavaConverters._
 
 class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslSetup
{
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName())
+  this.serverConfig.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 
@@ -35,5 +44,77 @@ class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest
     super.tearDown()
     closeSasl()
   }
-  
+
+  val ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2"),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW));
+  val ACL3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW));
+  val ACL_UNKNOWN = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW));
+
+  /**
+    * Test that ACL operations are not possible when the authorizer is disabled.
+    * Also see {@link kafka.api.KafkaAdminClientSecureIntegrationTest} for tests of ACL operations
+    * when the authorizer is enabled.
+    */
+  @Test
+  override def testAclOperations(): Unit = {
+    client = AdminClient.create(createConfig())
+    assertEquals(0, client.describeAcls(AclBindingFilter.ANY).all().get().size())
+    val results = client.createAcls(List(ACL2, ACL3).asJava)
+    assertEquals(Set(ACL2, ACL3), results.results().keySet().asScala)
+    results.results().values().asScala.foreach(value => value.get)
+    val results2 = client.createAcls(List(ACL_UNKNOWN).asJava)
+    assertEquals(Set(ACL_UNKNOWN), results2.results().keySet().asScala)
+    assertFutureExceptionTypeEquals(results2.all(), classOf[InvalidRequestException])
+    val results3 = client.deleteAcls(List(ACL1.toFilter, ACL2.toFilter, ACL3.toFilter).asJava)
+    assertEquals(Set(ACL1.toFilter, ACL2.toFilter, ACL3.toFilter), results3.results().keySet().asScala)
+    assertEquals(0, results3.results.get(ACL1.toFilter).get.acls.size())
+    assertEquals(Set(ACL2), results3.results.get(ACL2.toFilter).get.acls.asScala.map(result
=> result.acl()).toSet)
+    assertEquals(Set(ACL3), results3.results.get(ACL3.toFilter).get.acls.asScala.map(result
=> result.acl()).toSet)
+    client.close()
+  }
+
+  def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]):
Unit = {
+    TestUtils.waitUntilTrue(() => {
+      val results = client.describeAcls(filter).all().get()
+      acls == results.asScala.toSet
+    }, "timed out waiting for ACLs")
+  }
+
+  @Test
+  def testAclOperations2(): Unit = {
+    client = AdminClient.create(createConfig())
+    val results = client.createAcls(List(ACL2, ACL2).asJava)
+    assertEquals(Set(ACL2, ACL2), results.results().keySet().asScala)
+    results.all().get()
+    waitForDescribeAcls(client, AclBindingFilter.ANY, Set(ACL2))
+
+    val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.CLUSTER, null), AccessControlEntryFilter.ANY)
+    val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"),
AccessControlEntryFilter.ANY)
+
+    waitForDescribeAcls(client, filterA, Set())
+
+    val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions())
+    assertEquals(Set(filterA, filterB), results2.results().keySet().asScala)
+    assertEquals(Set(), results2.results.get(filterA).get.acls.asScala.map(result => result.acl()).toSet)
+    assertEquals(Set(ACL2), results2.results.get(filterB).get.acls.asScala.map(result =>
result.acl()).toSet)
+
+    waitForDescribeAcls(client, filterB, Set())
+
+    client.close()
+  }
+
+  @Test
+  def testAttemptToCreateInvalidAcls(): Unit = {
+    client = AdminClient.create(createConfig())
+    val clusterAcl = new AclBinding(new Resource(ResourceType.CLUSTER, "foobar"),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
+    val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, ""),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
+    val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions())
+    assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.results().keySet().asScala)
+    assertFutureExceptionTypeEquals(results.results().get(clusterAcl), classOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(results.results().get(emptyResourceNameAcl), classOf[InvalidRequestException])
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 1c496cd..5cb6f71 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -16,12 +16,14 @@ package kafka.server
 
 import java.net.Socket
 import java.nio.ByteBuffer
-import java.util.{LinkedHashMap, Properties}
+import java.util.{Collections, LinkedHashMap, Properties}
 import java.util.concurrent.{Executors, Future, TimeUnit}
+
 import kafka.admin.AdminUtils
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AccessControlEntry, AccessControlEntryFilter, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType, ResourceFilter, Resource => AdminResource,
ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLayer}
@@ -29,10 +31,12 @@ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, KafkaPrincipal}
 import org.junit.Assert._
-import org.junit.{Before, After, Test}
+import org.junit.{After, Before, Test}
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
@@ -260,6 +264,19 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.TXN_OFFSET_COMMIT =>
           new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, Map.empty.asJava)
 
+        case ApiKeys.DESCRIBE_ACLS =>
+          new DescribeAclsRequest.Builder(AclBindingFilter.ANY)
+
+        case ApiKeys.CREATE_ACLS =>
+          new CreateAclsRequest.Builder(Collections.singletonList(new AclCreation(new AclBinding(
+            new AdminResource(AdminResourceType.TOPIC, "mytopic"),
+            new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY)))))
+
+        case ApiKeys.DELETE_ACLS =>
+          new DeleteAclsRequest.Builder(Collections.singletonList(new AclBindingFilter(
+            new ResourceFilter(AdminResourceType.TOPIC, null),
+            new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY))))
+
         case key =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -346,6 +363,9 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
       case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs
       case ApiKeys.TXN_OFFSET_COMMIT => new TxnOffsetCommitResponse(response).throttleTimeMs
+      case ApiKeys.DESCRIBE_ACLS => new DescribeAclsResponse(response).throttleTimeMs
+      case ApiKeys.CREATE_ACLS => new CreateAclsResponse(response).throttleTimeMs
+      case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }


Mime
View raw message