kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 0.11.0 updated: MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
Date Mon, 02 Jul 2018 21:36:17 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 36bb0ea  MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
36bb0ea is described below

commit 36bb0eaecbe4c3e33a9cc57c321d8cc32707dbdc
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Mon Jul 2 22:11:05 2018 +0100

    MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
    
    ZooKeeper listener for change notifications should be created before loading the ACL cache
to avoid timing window if acls are modified when broker is starting up.
    
    Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@confluent.io>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 12 +++++--
 .../security/auth/SimpleAclAuthorizerTest.scala    | 37 +++++++++++++++++++++-
 2 files changed, 45 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 03eb9e3..c947a2e 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -112,11 +112,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
                       kafkaConfig.zkEnableSecureAcls)
     zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
 
+    // Start change listeners first and then populate the cache so that there is no timing
window
+    // between loading cache and processing change notifications.
+    startZkChangeListeners()
     loadCache()
 
-    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath,
SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
-    aclChangeListener.init()
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean
= {
@@ -246,6 +246,12 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + resource.name
   }
 
+  private[auth] def startZkChangeListeners(): Unit = {
+    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath,
SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
+    aclChangeListener.init()
+  }
+
   private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation:
Operation, resource: Resource, host: String) {
     val permissionType = if (authorized) "Allowed" else "Denied"
     authorizerLogger.debug(s"Principal = $principal is $permissionType Operation = $operation
from host = $host on resource = $resource")
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 5dbd1a8..b3cc6a4 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -18,11 +18,12 @@ package kafka.security.auth
 
 import java.net.InetAddress
 import java.util.UUID
+import java.util.concurrent.{Executors, Semaphore, TimeUnit}
 
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.WildCardHost
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
@@ -270,6 +271,40 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     }
   }
 
+  /**
+   * Verify that there is no timing window between loading ACL cache and setting
+   * up ZK change listener. Cache must be loaded before creating change listener
+   * in the authorizer to avoid the timing window.
+   */
+  @Test
+  def testChangeListenerTiming() {
+    val configureSemaphore = new Semaphore(0)
+    val listenerSemaphore = new Semaphore(0)
+    val executor = Executors.newSingleThreadExecutor
+    val simpleAclAuthorizer3 = new SimpleAclAuthorizer {
+      override private[auth] def startZkChangeListeners(): Unit = {
+        configureSemaphore.release()
+        listenerSemaphore.acquireUninterruptibly()
+        super.startZkChangeListeners()
+      }
+    }
+    try {
+      val future = executor.submit(CoreUtils.runnable(simpleAclAuthorizer3.configure(config.originals)))
+      configureSemaphore.acquire()
+      val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+      val acls = Set(new Acl(user1, Deny, "host-1", Read))
+      simpleAclAuthorizer.addAcls(acls, resource)
+
+      listenerSemaphore.release()
+      future.get(10, TimeUnit.SECONDS)
+
+      assertEquals(acls, simpleAclAuthorizer3.getAcls(resource))
+    } finally {
+      simpleAclAuthorizer3.close()
+      executor.shutdownNow()
+    }
+  }
+
   @Test
   def testLocalConcurrentModificationOfResourceAcls() {
     val commonResource = new Resource(Topic, "test")


Mime
View raw message