kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5638; Improve the Required ACL of ListGroups API (KIP-231) (#5352)
Date Mon, 13 Aug 2018 06:59:36 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e6fd99d  KAFKA-5638; Improve the Required ACL of ListGroups API (KIP-231) (#5352)
e6fd99d is described below

commit e6fd99dc6253333c5aaa9931d8ae319e637e9a11
Author: Vahid Hashemian <vahid.hashemian@gmail.com>
AuthorDate: Sun Aug 12 23:59:31 2018 -0700

    KAFKA-5638; Improve the Required ACL of ListGroups API (KIP-231) (#5352)
    
    Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 15 ++++----
 .../kafka/api/AuthorizerIntegrationTest.scala      | 42 ++++++++++++++++++++++
 docs/upgrade.html                                  |  8 ++---
 3 files changed, 54 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index de9e0bf..0273e3d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -63,7 +64,6 @@ import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
-import org.apache.kafka.common.resource.ResourcePattern
 
 /**
  * Logic to handle the various Kafka requests
@@ -1238,14 +1238,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
-    if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+    val (error, groups) = groupCoordinator.handleListGroups()
+    if (authorize(request.session, Describe, Resource.ClusterResource))
+      // With describe cluster access all groups are returned. We keep this alternative for
backward compatibility.
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
-    } else {
-      val (error, groups) = groupCoordinator.handleListGroups()
-      val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId,
group.protocolType) }
+        new ListGroupsResponse(requestThrottleMs, error, groups.map { group => new ListGroupsResponse.Group(group.groupId,
group.protocolType) }.asJava))
+    else {
+      val filteredGroups = groups.filter(group => authorize(request.session, Describe,
new Resource(Group, group.groupId, LITERAL)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ListGroupsResponse(requestThrottleMs, error, allGroups.asJava))
+        new ListGroupsResponse(requestThrottleMs, error, filteredGroups.map { group =>
new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava))
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index dc9ca85..faad071 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1028,6 +1028,48 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def testListGroupApiWithAndWithoutListGroupAcls() {
+    // write some record to the topic
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
+    sendRecords(1, tp)
+
+    // use two consumers to write to two different groups
+    val group2 = "other group"
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Group,
group2, LITERAL))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
+    this.consumers.head.subscribe(Collections.singleton(topic))
+    consumeRecords(this.consumers.head)
+
+    val otherConsumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
groupId = group2, securityProtocol = SecurityProtocol.PLAINTEXT)
+    otherConsumer.subscribe(Collections.singleton(topic))
+    consumeRecords(otherConsumer)
+
+    val adminClient = createAdminClient()
+
+    // first use cluster describe permission
+    removeAllAcls()
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), Resource.ClusterResource)
+    // it should list both groups (due to cluster describe permission)
+    assertEquals(Set(group, group2), adminClient.listConsumerGroups().all().get().asScala.map(_.groupId()).toSet)
+
+    // now replace cluster describe with group read permission
+    removeAllAcls()
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
+    // it should list only one group now
+    val groupList = adminClient.listConsumerGroups().all().get().asScala.toList
+    assertEquals(1, groupList.length)
+    assertEquals(group, groupList.head.groupId)
+
+    // now remove all acls and verify describe group access is required to list any group
+    removeAllAcls()
+    val listGroupResult = adminClient.listConsumerGroups()
+    assertEquals(List(), listGroupResult.errors().get().asScala.toList)
+    assertEquals(List(), listGroupResult.all().get().asScala.toList)
+    otherConsumer.close()
+  }
+
+  @Test
   def testDeleteGroupApiWithDeleteGroupAcl() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 264e26d..979190d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -33,12 +33,12 @@
     <li>The default value for the producer's <code>retries</code> config
was changed to <code>Integer.MAX_VALUE</code>, as we introduced <code>delivery.timeout.ms</code>
         in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer">KIP-91</a>,
         which sets an upper bound on the total time between sending a record and receiving
acknowledgement from the broker. By default,
-        the delivery timeout is set to 2 minutes.
-    </li>
+        the delivery timeout is set to 2 minutes.</li>
     <li>By default, MirrorMaker now overrides <code>delivery.timeout.ms</code>
to <code>Integer.MAX_VALUE</code> when
         configuring the producer. If you have overridden the value of <code>retries</code>
in order to fail faster,
-        you will instead need to override <code>delivery.timeout.ms</code>.
-    </li>
+        you will instead need to override <code>delivery.timeout.ms</code>.</li>
+    <li>The <code>ListGroup</code> API now expects, as a recommended alternative,
<code>Describe Group</code> access to the groups a user should be able to list.
+        Even though the old <code>Describe Cluster</code> access is still supported
for backward compatibility, using it for this API is not advised.</li>
 </ol>
 
 


Mime
View raw message