kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7028: Properly authorize custom principal objects (#5311)
Date Fri, 29 Jun 2018 13:13:51 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0d2ddb  KAFKA-7028: Properly authorize custom principal objects (#5311)
b0d2ddb is described below

commit b0d2ddb330a24971c53ba96129a3c66312b30c5c
Author: Stanislav Kozlovski <familyguyuser192@windowslive.com>
AuthorDate: Fri Jun 29 14:13:45 2018 +0100

    KAFKA-7028: Properly authorize custom principal objects (#5311)
    
    Use KafkaPrincipal objects for authorization in `SimpleAclAuthorizer` so that comparison
with super.users and ACLs instantiated from Strings work. Previously, it would compare two
different classes `KafkaPrincipal` and the custom class, which would always return false because
of the implementation of `KafkaPrincipal#equals`.
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  |  8 ++++-
 .../security/auth/SimpleAclAuthorizerTest.scala    | 40 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 1dc3a1f..c5bbfdf 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -110,7 +110,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       throw new IllegalArgumentException("Only literal resources are supported. Got: " +
resource.patternType)
     }
 
-    val principal = session.principal
+    // ensure we compare identical classes
+    val sessionPrincipal = session.principal
+    val principal = if (classOf[KafkaPrincipal] != sessionPrincipal.getClass)
+      new KafkaPrincipal(sessionPrincipal.getPrincipalType, sessionPrincipal.getName)
+    else
+      sessionPrincipal
+
     val host = session.clientAddress.getHostAddress
 
     def isEmptyAclAndAuthorized(acls: Set[Acl]): Boolean = {
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 7ab3c0a..3d1ceb6 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -54,6 +54,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   private var config: KafkaConfig = _
   private var zooKeeperClient: ZooKeeperClient = _
 
+  class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType,
name) {
+    override def equals(o: scala.Any): Boolean = false
+  }
+
   @Before
   override def setUp() {
     super.setUp()
@@ -139,6 +143,29 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertTrue("User3 should have WRITE access from host2", simpleAclAuthorizer.authorize(user3Session,
Write, resource))
   }
 
+  /**
+    CustomPrincipals should be compared with their principal type and name
+   */
+  @Test
+  def testAllowAccessWithCustomPrincipal() {
+    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val host2 = InetAddress.getByName("192.168.1.2")
+
+    // user has READ access from host2 but not from host1
+    val acl1 = new Acl(user, Deny, host1.getHostAddress, Read)
+    val acl2 = new Acl(user, Allow, host2.getHostAddress, Read)
+    val acls = Set[Acl](acl1, acl2)
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    val host1Session = Session(customUserPrincipal, host1)
+    val host2Session = Session(customUserPrincipal, host2)
+
+    assertTrue("User1 should have READ access from host2", simpleAclAuthorizer.authorize(host2Session,
Read, resource))
+    assertFalse("User1 should not have READ access from host1 due to denyAcl", simpleAclAuthorizer.authorize(host1Session,
Read, resource))
+  }
+
   @Test
   def testDenyTakesPrecedence() {
     val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
@@ -177,6 +204,19 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2,
Read, resource))
   }
 
+  /**
+    CustomPrincipals should be compared with their principal type and name
+   */
+  @Test
+  def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
+    val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
+    changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
+
+    val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
+
+    assertTrue("superuser with custom principal always has access, no matter what acls.",
simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
   @Test
   def testWildCardAcls(): Unit = {
     assertFalse("when acls = [],  authorizer should fail close.", simpleAclAuthorizer.authorize(session,
Read, resource))


Mime
View raw message