kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 1.1 updated: MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
Date Mon, 02 Jul 2018 21:25:10 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 4102073  MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
4102073 is described below

commit 4102073add48787e7caf6a2afdc3e6278014c243
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Mon Jul 2 22:11:05 2018 +0100

    MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
    
    ZooKeeper listener for change notifications should be created before loading the ACL cache
to avoid timing window if acls are modified when broker is starting up.
    
    Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@confluent.io>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 11 +++++--
 .../security/auth/SimpleAclAuthorizerTest.scala    | 37 +++++++++++++++++++++-
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index c439f5e..588cbdc 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -95,10 +95,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer")
     zkClient.createAclPaths()
 
+    // Start change listeners first and then populate the cache so that there is no timing
window
+    // between loading cache and processing change notifications.
+    startZkChangeListeners()
     loadCache()
-
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path,
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler)
-    aclChangeListener.init()
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean
= {
@@ -223,6 +223,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
+  private[auth] def startZkChangeListeners(): Unit = {
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path,
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler)
+    aclChangeListener.init()
+  }
+
   private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation:
Operation, resource: Resource, host: String) {
     def logMessage: String = {
       val authResult = if (authorized) "Allowed" else "Denied"
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1e18f1d..ac68f70 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -18,11 +18,12 @@ package kafka.security.auth
 
 import java.net.InetAddress
 import java.util.UUID
+import java.util.concurrent.{Executors, Semaphore, TimeUnit}
 
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.WildCardHost
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
@@ -270,6 +271,40 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     }
   }
 
+  /**
+   * Verify that there is no timing window between loading ACL cache and setting
+   * up ZK change listener. Cache must be loaded before creating change listener
+   * in the authorizer to avoid the timing window.
+   */
+  @Test
+  def testChangeListenerTiming() {
+    val configureSemaphore = new Semaphore(0)
+    val listenerSemaphore = new Semaphore(0)
+    val executor = Executors.newSingleThreadExecutor
+    val simpleAclAuthorizer3 = new SimpleAclAuthorizer {
+      override private[auth] def startZkChangeListeners(): Unit = {
+        configureSemaphore.release()
+        listenerSemaphore.acquireUninterruptibly()
+        super.startZkChangeListeners()
+      }
+    }
+    try {
+      val future = executor.submit(CoreUtils.runnable(simpleAclAuthorizer3.configure(config.originals)))
+      configureSemaphore.acquire()
+      val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+      val acls = Set(new Acl(user1, Deny, "host-1", Read))
+      simpleAclAuthorizer.addAcls(acls, resource)
+
+      listenerSemaphore.release()
+      future.get(10, TimeUnit.SECONDS)
+
+      assertEquals(acls, simpleAclAuthorizer3.getAcls(resource))
+    } finally {
+      simpleAclAuthorizer3.close()
+      executor.shutdownNow()
+    }
+  }
+
   @Test
   def testLocalConcurrentModificationOfResourceAcls() {
     val commonResource = new Resource(Topic, "test")


Mime
View raw message