kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (#7956)
Date Mon, 01 Jun 2020 16:22:40 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 171fe8d  KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing
(#7956)
171fe8d is described below

commit 171fe8d78e2fcd0ba091be5dd0df5506592138c6
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Mon Jun 1 16:38:21 2020 +0100

    KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (#7956)
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 .../apache/kafka/server/authorizer/Authorizer.java |  2 +
 .../kafka/security/authorizer/AclAuthorizer.scala  | 28 ++++++--
 .../security/authorizer/AclAuthorizerTest.scala    | 78 +++++++++++++++++++++-
 3 files changed, 103 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index 45bd6d9..1865e7e 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -114,6 +114,8 @@ public interface Authorizer extends Configurable, Closeable {
      * This is an asynchronous API that enables the caller to avoid blocking during the update.
Implementations of this
      * API can return completed futures using {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}
      * to process the update synchronously on the request thread.
+     * <p>
+     * Refer to the authorizer implementation docs for details on concurrent update guarantees.
      *
      * @param requestContext Request context if the ACL is being deleted by a broker to handle
      *        a client request to delete ACLs. This may be null if ACLs are deleted directly
in ZooKeeper
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index d9e89e1..9e23978 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -223,6 +223,22 @@ class AclAuthorizer extends Authorizer with Logging {
     results.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
   }
 
+  /**
+   *
+   * <b>Concurrent updates:</b>
+   * <ul>
+   *   <li>If ACLs are created using [[kafka.security.authorizer.AclAuthorizer#createAcls]]
while a delete is in
+   *   progress, these ACLs may or may not be considered for deletion depending on the order
of updates.
+   *   The returned [[org.apache.kafka.server.authorizer.AclDeleteResult]] indicates which
ACLs were deleted.</li>
+   *   <li>If the provided filters use resource pattern type
+   *   [[org.apache.kafka.common.resource.PatternType#MATCH]] that needs to filter all resources
to determine
+   *   matching ACLs, only ACLs that have already been propagated to the broker processing
the ACL update will be
+   *   deleted. This may not include some ACLs that were persisted, but not yet propagated
to all brokers. The
+   *   returned [[org.apache.kafka.server.authorizer.AclDeleteResult]] indicates which ACLs
were deleted.</li>
+   *   <li>If the provided filters use other resource pattern types that perform a
direct match, all matching ACLs
+   *   from previously completed [[kafka.security.authorizer.AclAuthorizer#createAcls]] are
guaranteed to be deleted.</li>
+   * </ul>
+   */
   override def deleteAcls(requestContext: AuthorizableRequestContext,
                           aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <:
CompletionStage[AclDeleteResult]] = {
     val deletedBindings = new mutable.HashMap[AclBinding, Int]()
@@ -553,12 +569,16 @@ class AclAuthorizer extends Authorizer with Logging {
     }
   }
 
+  private[authorizer] def processAclChangeNotification(resource: ResourcePattern): Unit =
{
+    lock synchronized {
+      val versionedAcls = getAclsFromZk(resource)
+      updateCache(resource, versionedAcls)
+    }
+  }
+
   object AclChangedNotificationHandler extends AclChangeNotificationHandler {
     override def processNotification(resource: ResourcePattern): Unit = {
-      lock synchronized {
-        val versionedAcls = getAclsFromZk(resource)
-        updateCache(resource, versionedAcls)
-      }
+      processAclChangeNotification(resource)
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index 9403870..89d2095 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -20,7 +20,7 @@ import java.io.File
 import java.net.InetAddress
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
-import java.util.UUID
+import java.util.{Collections, UUID}
 import java.util.concurrent.{Executors, Semaphore, TimeUnit}
 
 import kafka.Kafka
@@ -912,6 +912,82 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
     })
   }
 
+  @Test
+  def testCreateDeleteTiming(): Unit = {
+    val literalResource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
+    val prefixedResource = new ResourcePattern(TOPIC, "bar-", PREFIXED)
+    val wildcardResource = new ResourcePattern(TOPIC, "*", LITERAL)
+    val ace = new AccessControlEntry(principal.toString, WildcardHost, READ, ALLOW)
+    val updateSemaphore = new Semaphore(1)
+
+    def createAcl(createAuthorizer: AclAuthorizer, resource: ResourcePattern): AclBinding
= {
+      val acl = new AclBinding(resource, ace)
+      createAuthorizer.createAcls(requestContext, Collections.singletonList(acl)).asScala
+        .foreach(_.toCompletableFuture.get(15, TimeUnit.SECONDS))
+      acl
+    }
+
+    def deleteAcl(deleteAuthorizer: AclAuthorizer,
+                  resource: ResourcePattern,
+                  deletePatternType: PatternType): List[AclBinding] = {
+
+      val filter = new AclBindingFilter(
+        new ResourcePatternFilter(resource.resourceType(), resource.name(), deletePatternType),
+        AccessControlEntryFilter.ANY)
+      deleteAuthorizer.deleteAcls(requestContext, Collections.singletonList(filter)).asScala
+        .map(_.toCompletableFuture.get(15, TimeUnit.SECONDS))
+        .flatMap(_.aclBindingDeleteResults.asScala)
+        .map(_.aclBinding)
+        .toList
+    }
+
+    def listAcls(authorizer: AclAuthorizer): List[AclBinding] = {
+      authorizer.acls(AclBindingFilter.ANY).asScala.toList
+    }
+
+    def verifyCreateDeleteAcl(deleteAuthorizer: AclAuthorizer,
+                              resource: ResourcePattern,
+                              deletePatternType: PatternType): Unit = {
+      updateSemaphore.acquire()
+      assertEquals(List.empty, listAcls(deleteAuthorizer))
+      val acl = createAcl(aclAuthorizer, resource)
+      val deleted = deleteAcl(deleteAuthorizer, resource, deletePatternType)
+      if (deletePatternType != PatternType.MATCH) {
+        assertEquals(List(acl), deleted)
+      } else {
+        assertEquals(List.empty[AclBinding], deleted)
+      }
+      updateSemaphore.release()
+      if (deletePatternType == PatternType.MATCH) {
+        TestUtils.waitUntilTrue(() => listAcls(deleteAuthorizer).nonEmpty, "ACL not propagated")
+        assertEquals(List(acl), deleteAcl(deleteAuthorizer, resource, deletePatternType))
+      }
+      TestUtils.waitUntilTrue(() => listAcls(deleteAuthorizer).isEmpty, "ACL delete not
propagated")
+    }
+
+    val deleteAuthorizer = new AclAuthorizer {
+      override def processAclChangeNotification(resource: ResourcePattern): Unit = {
+        updateSemaphore.acquire()
+        try {
+          super.processAclChangeNotification(resource)
+        } finally {
+          updateSemaphore.release()
+        }
+      }
+    }
+
+    try {
+      deleteAuthorizer.configure(config.originals)
+      List(literalResource, prefixedResource, wildcardResource).foreach { resource =>
+        verifyCreateDeleteAcl(deleteAuthorizer, resource, resource.patternType())
+        verifyCreateDeleteAcl(deleteAuthorizer, resource, PatternType.ANY)
+        verifyCreateDeleteAcl(deleteAuthorizer, resource, PatternType.MATCH)
+      }
+    } finally {
+      deleteAuthorizer.close()
+    }
+  }
+
   private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]): Unit
= {
     aclAuthorizer.close()
 


Mime
View raw message