kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1)
Date Fri, 01 Mar 2019 15:54:21 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f11fa5e  KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1)
f11fa5e is described below

commit f11fa5ef402193e2da785466e698e11b56bd19c7
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Fri Mar 1 21:23:52 2019 +0530

    KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1)
    
    -  Use automatic RPC generation in DescribeGroups Request/Response classes
    
    Author: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
    
    Closes #6322 from omkreddy/KIP-430-Return-Ops
---
 .../clients/admin/ConsumerGroupDescription.java    |  31 ++-
 .../admin/DescribeConsumerGroupsOptions.java       |  10 +
 .../kafka/clients/admin/KafkaAdminClient.java      |  51 +++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   8 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../common/requests/DescribeGroupsRequest.java     |  74 ++----
 .../common/requests/DescribeGroupsResponse.java    | 290 +++++----------------
 .../java/org/apache/kafka/common/utils/Utils.java  |  24 ++
 .../common/message/DescribeGroupsRequest.json      |   7 +-
 .../common/message/DescribeGroupsResponse.json     |   7 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  30 ++-
 .../kafka/common/requests/RequestResponseTest.java |  19 +-
 .../org/apache/kafka/common/utils/UtilsTest.java   |  27 +-
 core/src/main/scala/kafka/admin/AclCommand.scala   |  10 +-
 core/src/main/scala/kafka/admin/AdminClient.scala  |  24 +-
 .../scala/kafka/security/auth/ResourceType.scala   |   7 +
 core/src/main/scala/kafka/server/KafkaApis.scala   |  64 ++++-
 .../kafka/api/AdminClientIntegrationTest.scala     |  13 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  10 +-
 .../api/DescribeAuthorizedOperationsTest.scala     | 121 +++++++++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   8 +-
 21 files changed, 462 insertions(+), 375 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 8947293..8dd6018 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -19,12 +19,14 @@ package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * A detailed description of a single consumer group in the cluster.
@@ -36,13 +38,15 @@ public class ConsumerGroupDescription {
     private final String partitionAssignor;
     private final ConsumerGroupState state;
     private final Node coordinator;
+    private Set<AclOperation> authorizedOperations;
 
     public ConsumerGroupDescription(String groupId,
                                     boolean isSimpleConsumerGroup,
                                     Collection<MemberDescription> members,
                                     String partitionAssignor,
                                     ConsumerGroupState state,
-                                    Node coordinator) {
+                                    Node coordinator,
+                                    Set<AclOperation> authorizedOperations) {
         this.groupId = groupId == null ? "" : groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
         this.members = members == null ? Collections.emptyList() :
@@ -50,23 +54,26 @@ public class ConsumerGroupDescription {
         this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
         this.state = state;
         this.coordinator = coordinator;
+        this.authorizedOperations = authorizedOperations;
     }
 
     @Override
-    public boolean equals(Object o) {
+    public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        ConsumerGroupDescription that = (ConsumerGroupDescription) o;
+        final ConsumerGroupDescription that = (ConsumerGroupDescription) o;
         return isSimpleConsumerGroup == that.isSimpleConsumerGroup &&
-            groupId.equals(that.groupId) &&
-            members.equals(that.members) &&
-            partitionAssignor.equals(that.partitionAssignor) &&
-            state.equals(that.state);
+            Objects.equals(groupId, that.groupId) &&
+            Objects.equals(members, that.members) &&
+            Objects.equals(partitionAssignor, that.partitionAssignor) &&
+            state == that.state &&
+            Objects.equals(coordinator, that.coordinator) &&
+            Objects.equals(authorizedOperations, that.authorizedOperations);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(isSimpleConsumerGroup, groupId, members, partitionAssignor, state);
+        return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, state, coordinator, authorizedOperations);
     }
 
     /**
@@ -111,6 +118,13 @@ public class ConsumerGroupDescription {
         return coordinator;
     }
 
+    /**
+     * authorizedOperations for this group
+     */
+    public  Set<AclOperation> authorizedOperations() {
+        return authorizedOperations;
+    }
+
     @Override
     public String toString() {
         return "(groupId=" + groupId +
@@ -119,6 +133,7 @@ public class ConsumerGroupDescription {
             ", partitionAssignor=" + partitionAssignor +
             ", state=" + state +
             ", coordinator=" + coordinator +
+            ", authorizedOperations=" + authorizedOperations +
             ")";
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
index 7daff1a..8f05f61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
@@ -28,4 +28,14 @@ import java.util.Collection;
  */
 @InterfaceStability.Evolving
 public class DescribeConsumerGroupsOptions extends AbstractOptions<DescribeConsumerGroupsOptions> {
+    private boolean includeAuthorizedOperations;
+
+    public DescribeConsumerGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
+        this.includeAuthorizedOperations = includeAuthorizedOperations;
+        return this;
+    }
+
+    public boolean includeAuthorizedOperations() {
+        return includeAuthorizedOperations;
+    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1567d90..95337d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
@@ -62,6 +63,9 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -128,9 +132,11 @@ import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -2430,7 +2436,10 @@ public class KafkaAdminClient extends AdminClient {
                     runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
+                            return new DescribeGroupsRequest.Builder(
+                                new DescribeGroupsRequestData()
+                                    .setGroups(Collections.singletonList(groupId))
+                                    .setIncludeAuthorizedOperations(options.includeAuthorizedOperations()));
                         }
 
                         @Override
@@ -2438,23 +2447,27 @@ public class KafkaAdminClient extends AdminClient {
                             final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
 
                             KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
-                            final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
+                            final DescribedGroup describedGroup = response.data()
+                                .groups()
+                                .stream()
+                                .filter(group -> groupId.equals(group.groupId()))
+                                .findFirst().get();
 
-                            final Errors groupError = groupMetadata.error();
+                            final Errors groupError = Errors.forCode(describedGroup.errorCode());
                             if (groupError != Errors.NONE) {
                                 // TODO: KAFKA-6789, we can retry based on the error code
                                 future.completeExceptionally(groupError.exception());
                             } else {
-                                final String protocolType = groupMetadata.protocolType();
+                                final String protocolType = describedGroup.protocolType();
                                 if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
-                                    final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
+                                    final List<DescribedGroupMember> members = describedGroup.members();
                                     final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
-
-                                    for (DescribeGroupsResponse.GroupMember groupMember : members) {
+                                    final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
+                                    for (DescribedGroupMember groupMember : members) {
                                         Set<TopicPartition> partitions = Collections.emptySet();
-                                        if (groupMember.memberAssignment().remaining() > 0) {
+                                        if (groupMember.memberAssignment().length > 0) {
                                             final PartitionAssignor.Assignment assignment = ConsumerProtocol.
-                                                deserializeAssignment(groupMember.memberAssignment().duplicate());
+                                                deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
                                             partitions = new HashSet<>(assignment.partitions());
                                         }
                                         final MemberDescription memberDescription =
@@ -2465,12 +2478,12 @@ public class KafkaAdminClient extends AdminClient {
                                         memberDescriptions.add(memberDescription);
                                     }
                                     final ConsumerGroupDescription consumerGroupDescription =
-                                            new ConsumerGroupDescription(groupId,
-                                                protocolType.isEmpty(),
+                                            new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
                                                 memberDescriptions,
-                                                groupMetadata.protocol(),
-                                                ConsumerGroupState.parse(groupMetadata.state()),
-                                                fcResponse.node());
+                                                describedGroup.protocolData(),
+                                                ConsumerGroupState.parse(describedGroup.groupState()),
+                                                fcResponse.node(),
+                                                authorizedOperations);
                                     future.complete(consumerGroupDescription);
                                 }
                             }
@@ -2495,6 +2508,16 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeConsumerGroupsResult(new HashMap<>(futures));
     }
 
+    private Set<AclOperation> validAclOperations(final int authorizedOperations) {
+        return Utils.from32BitField(authorizedOperations)
+            .stream()
+            .map(AclOperation::fromCode)
+            .filter(operation -> operation != AclOperation.UNKNOWN
+                && operation != AclOperation.ALL
+                && operation != AclOperation.ANY)
+            .collect(Collectors.toSet());
+    }
+
     private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
         Errors error = response.error();
         if (error.exception() instanceof RetriableException) {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 937b044..19bf6f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -61,8 +63,6 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
 import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
-import org.apache.kafka.common.requests.DescribeGroupsRequest;
-import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.requests.EndTxnRequest;
@@ -139,8 +139,8 @@ public enum ApiKeys {
     HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
     LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
     SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), SyncGroupResponse.schemaVersions()),
-    DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequest.schemaVersions(),
-            DescribeGroupsResponse.schemaVersions()),
+    DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
+            DescribeGroupsResponseData.SCHEMAS),
     LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()),
     SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS),
     API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 68b505b..959379c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -101,7 +101,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case LEADER_AND_ISR:
                 return new LeaderAndIsrResponse(struct);
             case DESCRIBE_GROUPS:
-                return new DescribeGroupsResponse(struct);
+                return new DescribeGroupsResponse(struct, version);
             case LIST_GROUPS:
                 return new ListGroupsResponse(struct);
             case SASL_HANDSHAKE:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index 006af4f..8f003ac 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -16,97 +16,65 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
 
 public class DescribeGroupsRequest extends AbstractRequest {
-    private static final String GROUP_IDS_KEY_NAME = "group_ids";
-
-    /* Describe group api */
-    private static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(
-            new Field(GROUP_IDS_KEY_NAME, new ArrayOf(STRING), "List of groupIds to request metadata for (an " +
-                    "empty groupId array will return empty group metadata)."));
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    private static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema DESCRIBE_GROUPS_REQUEST_V2 = DESCRIBE_GROUPS_REQUEST_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1, DESCRIBE_GROUPS_REQUEST_V2};
-    }
-
     public static class Builder extends AbstractRequest.Builder<DescribeGroupsRequest> {
-        private final List<String> groupIds;
+        private final DescribeGroupsRequestData data;
 
-        public Builder(List<String> groupIds) {
+        public Builder(DescribeGroupsRequestData data) {
             super(ApiKeys.DESCRIBE_GROUPS);
-            this.groupIds = groupIds;
+            this.data = data;
         }
 
         @Override
         public DescribeGroupsRequest build(short version) {
-            return new DescribeGroupsRequest(this.groupIds, version);
+            return new DescribeGroupsRequest(data, version);
         }
 
         @Override
         public String toString() {
-            return "(type=DescribeGroupsRequest, groupIds=(" + Utils.join(groupIds, ",") + "))";
+            return data.toString();
         }
     }
 
-    private final List<String> groupIds;
+    private final DescribeGroupsRequestData data;
+    private final short version;
 
-    private DescribeGroupsRequest(List<String> groupIds, short version) {
+    private DescribeGroupsRequest(DescribeGroupsRequestData data, short version) {
         super(ApiKeys.DESCRIBE_GROUPS, version);
-        this.groupIds = groupIds;
+        this.data = data;
+        this.version = version;
     }
 
     public DescribeGroupsRequest(Struct struct, short version) {
         super(ApiKeys.DESCRIBE_GROUPS, version);
-        this.groupIds = new ArrayList<>();
-        for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME))
-            this.groupIds.add((String) groupId);
+        this.data = new DescribeGroupsRequestData(struct, version);
+        this.version = version;
     }
 
-    public List<String> groupIds() {
-        return groupIds;
+    public DescribeGroupsRequestData data() {
+        return data;
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.requestSchema(version()));
-        struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        short version = version();
-        switch (version) {
-            case 0:
-                return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
-            case 1:
-            case 2:
-                return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), groupIds);
-
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        version, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_GROUPS.latestVersion()));
+        if (version == 0) {
+            return DescribeGroupsResponse.fromError(DEFAULT_THROTTLE_TIME, Errors.forException(e), data.groups());
+        } else {
+            return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), data.groups());
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 06c2471..61b9ea2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -16,80 +16,23 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import java.util.Set;
 
 public class DescribeGroupsResponse extends AbstractResponse {
 
-    private static final String GROUPS_KEY_NAME = "groups";
-
-    private static final String GROUP_STATE_KEY_NAME = "state";
-    private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
-    private static final String PROTOCOL_KEY_NAME = "protocol";
-
-    private static final String MEMBERS_KEY_NAME = "members";
-    private static final String CLIENT_ID_KEY_NAME = "client_id";
-    private static final String CLIENT_HOST_KEY_NAME = "client_host";
-    private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
-    private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
-
-    private static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(
-            MEMBER_ID,
-            new Field(CLIENT_ID_KEY_NAME, STRING, "The client id used in the member's latest join group request"),
-            new Field(CLIENT_HOST_KEY_NAME, STRING, "The client host used in the request session corresponding to the " +
-                    "member's join group."),
-            new Field(MEMBER_METADATA_KEY_NAME, BYTES, "The metadata corresponding to the current group protocol in " +
-                    "use (will only be present if the group is stable)."),
-            new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "The current assignment provided by the group leader " +
-                    "(will only be present if the group is stable)."));
-
-    private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(
-            ERROR_CODE,
-            GROUP_ID,
-            new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, CompletingRebalance, " +
-                    "PreparingRebalance, or empty if there is no active group)"),
-            new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol type (will be empty if there is no active group)"),
-            new Field(PROTOCOL_KEY_NAME, STRING, "The current group protocol (only provided if the group is Stable)"),
-            new Field(MEMBERS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "Current group members " +
-                    "(only provided if the group is not Dead)"));
-
-    private static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(
-            new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
-    private static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema DESCRIBE_GROUPS_RESPONSE_V2 = DESCRIBE_GROUPS_RESPONSE_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1, DESCRIBE_GROUPS_RESPONSE_V2};
-    }
-
-    public static final String UNKNOWN_STATE = "";
-    public static final String UNKNOWN_PROTOCOL_TYPE = "";
-    public static final String UNKNOWN_PROTOCOL = "";
-
     /**
      * Possible per-group error codes:
      *
@@ -99,197 +42,92 @@ public class DescribeGroupsResponse extends AbstractResponse {
      * AUTHORIZATION_FAILED (29)
      */
 
-    private final Map<String, GroupMetadata> groups;
-    private final int throttleTimeMs;
+    private DescribeGroupsResponseData data;
 
-    public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
-        this(DEFAULT_THROTTLE_TIME, groups);
+    public DescribeGroupsResponse(DescribeGroupsResponseData data) {
+        this.data = data;
     }
 
-    public DescribeGroupsResponse(int throttleTimeMs, Map<String, GroupMetadata> groups) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.groups = groups;
+    public DescribeGroupsResponse(Struct struct, short version) {
+        this.data = new DescribeGroupsResponseData(struct, version);
     }
 
-    public DescribeGroupsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        this.groups = new HashMap<>();
-        for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
-            Struct groupStruct = (Struct) groupObj;
+    public static DescribedGroupMember groupMember(
+        final String memberId,
+        final String clientId,
+        final String clientHost,
+        final byte[] assignment,
+        final byte[] metadata) {
+        return new DescribedGroupMember()
+            .setMemberId(memberId)
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .setMemberAssignment(assignment)
+            .setMemberMetadata(metadata);
+    }
 
-            String groupId = groupStruct.get(GROUP_ID);
-            Errors error = Errors.forCode(groupStruct.get(ERROR_CODE));
-            String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
-            String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
-            String protocol = groupStruct.getString(PROTOCOL_KEY_NAME);
+    public static DescribedGroup groupMetadata(
+        final String groupId,
+        final Errors error,
+        final String state,
+        final String protocolType,
+        final String protocol,
+        final List<DescribedGroupMember> members,
+        final Set<Byte> authorizedOperations) {
+        DescribedGroup groupMetada = new DescribedGroup();
+        groupMetada.setGroupId(groupId)
+            .setErrorCode(error.code())
+            .setGroupState(state)
+            .setProtocolType(protocolType)
+            .setProtocolData(protocol)
+            .setMembers(members)
+            .setAuthorizedOperations(Utils.to32BitField(authorizedOperations));
+        return  groupMetada;
+    }
 
-            List<GroupMember> members = new ArrayList<>();
-            for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) {
-                Struct memberStruct = (Struct) memberObj;
-                String memberId = memberStruct.get(MEMBER_ID);
-                String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME);
-                String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME);
-                ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME);
-                ByteBuffer memberAssignment = memberStruct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
-                members.add(new GroupMember(memberId, clientId, clientHost,
-                        memberMetadata, memberAssignment));
-            }
-            this.groups.put(groupId, new GroupMetadata(error, state, protocolType, protocol, members));
-        }
+    public DescribeGroupsResponseData data() {
+        return data;
     }
 
     @Override
-    public int throttleTimeMs() {
-        return throttleTimeMs;
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
     }
 
-    public Map<String, GroupMetadata> groups() {
-        return groups;
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
     }
 
+    public static final String UNKNOWN_STATE = "";
+    public static final String UNKNOWN_PROTOCOL_TYPE = "";
+    public static final String UNKNOWN_PROTOCOL = "";
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
-        for (GroupMetadata response : groups.values())
-            updateErrorCounts(errorCounts, response.error);
+        data.groups().forEach(describedGroup -> {
+            updateErrorCounts(errorCounts, Errors.forCode(describedGroup.errorCode()));
+        });
         return errorCounts;
     }
 
-    public static class GroupMetadata {
-        private final Errors error;
-        private final String state;
-        private final String protocolType;
-        private final String protocol;
-        private final List<GroupMember> members;
-
-        public GroupMetadata(Errors error,
-                             String state,
-                             String protocolType,
-                             String protocol,
-                             List<GroupMember> members) {
-            this.error = error;
-            this.state = state;
-            this.protocolType = protocolType;
-            this.protocol = protocol;
-            this.members = members;
-        }
-
-        public Errors error() {
-            return error;
-        }
-
-        public String state() {
-            return state;
-        }
-
-        public String protocolType() {
-            return protocolType;
-        }
-
-        public String protocol() {
-            return protocol;
-        }
-
-        public List<GroupMember> members() {
-            return members;
-        }
-
-        public static GroupMetadata forError(Errors error) {
-            return new DescribeGroupsResponse.GroupMetadata(
-                error,
-                DescribeGroupsResponse.UNKNOWN_STATE,
-                DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
-                DescribeGroupsResponse.UNKNOWN_PROTOCOL,
-                Collections.emptyList());
-        }
-    }
-
-    public static class GroupMember {
-        private final String memberId;
-        private final String clientId;
-        private final String clientHost;
-        private final ByteBuffer memberMetadata;
-        private final ByteBuffer memberAssignment;
-
-        public GroupMember(String memberId,
-                           String clientId,
-                           String clientHost,
-                           ByteBuffer memberMetadata,
-                           ByteBuffer memberAssignment) {
-            this.memberId = memberId;
-            this.clientId = clientId;
-            this.clientHost = clientHost;
-            this.memberMetadata = memberMetadata;
-            this.memberAssignment = memberAssignment;
-        }
-
-        public String memberId() {
-            return memberId;
-        }
-
-        public String clientId() {
-            return clientId;
-        }
-
-        public String clientHost() {
-            return clientHost;
-        }
-
-        public ByteBuffer memberMetadata() {
-            return memberMetadata;
-        }
-
-        public ByteBuffer memberAssignment() {
-            return memberAssignment;
-        }
-    }
-
-    public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
-        return fromError(DEFAULT_THROTTLE_TIME, error, groupIds);
+    public static DescribedGroup forError(String groupId, Errors error) {
+        return groupMetadata(groupId, error, DescribeGroupsResponse.UNKNOWN_STATE, DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+                DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), Collections.emptySet());
     }
 
     public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String> groupIds) {
-        GroupMetadata errorMetadata = GroupMetadata.forError(error);
-        Map<String, GroupMetadata> groups = new HashMap<>();
+        DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
+        describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
         for (String groupId : groupIds)
-            groups.put(groupId, errorMetadata);
-        return new DescribeGroupsResponse(throttleTimeMs, groups);
-    }
-
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
-        List<Struct> groupStructs = new ArrayList<>();
-        for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
-            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
-            GroupMetadata group = groupEntry.getValue();
-            groupStruct.set(GROUP_ID, groupEntry.getKey());
-            groupStruct.set(ERROR_CODE, group.error.code());
-            groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
-            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
-            groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
-            List<Struct> membersList = new ArrayList<>();
-            for (GroupMember member : group.members) {
-                Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
-                memberStruct.set(MEMBER_ID, member.memberId);
-                memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
-                memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
-                memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata);
-                memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, member.memberAssignment);
-                membersList.add(memberStruct);
-            }
-            groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray());
-            groupStructs.add(groupStruct);
-        }
-        struct.set(GROUPS_KEY_NAME, groupStructs.toArray());
-
-        return struct;
+            describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId, error));
+        return new DescribeGroupsResponse(describeGroupsResponseData);
     }
 
     public static DescribeGroupsResponse parse(ByteBuffer buffer, short version) {
-        return new DescribeGroupsResponse(ApiKeys.DESCRIBE_GROUPS.parseResponse(version, buffer));
+        return new DescribeGroupsResponse(
+                ApiKeys.DESCRIBE_GROUPS.responseSchema(version).read(buffer), version);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8c9d162..53417d2 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1005,4 +1005,28 @@ public final class Utils {
                 .collect(Collectors.collectingAndThen(Collectors.toList(), finisher));
     }
 
+    public static int to32BitField(final Set<Byte> bytes) {
+        int value = 0;
+        for (final byte b : bytes)
+            value |= 1 << checkRange(b);
+        return value;
+    }
+
+    private static byte checkRange(final byte i) {
+        if (i > 31)
+            throw new IllegalArgumentException("out of range: i>31, i = " + i);
+        if (i < 0)
+            throw new IllegalArgumentException("out of range: i<0, i = " + i);
+        return i;
+    }
+
+    public static Set<Byte> from32BitField(final int intValue) {
+        Set<Byte> result = new HashSet<>();
+        for (int itr = intValue, count = 0; itr != 0; itr >>>= 1) {
+            if ((itr & 1) != 0)
+                result.add((byte) count);
+            count++;
+        }
+        return result;
+    }
 }
diff --git a/clients/src/main/resources/common/message/DescribeGroupsRequest.json b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
index 8039a7e..3557bae 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
@@ -18,9 +18,12 @@
   "type": "request",
   "name": "DescribeGroupsRequest",
   // Versions 1 and 2 are the same as version 0.
-  "validVersions": "0-2",
+  // Starting in version 3, authorized operations can be requested.
+  "validVersions": "0-3",
   "fields": [
     { "name": "Groups", "type": "[]string", "versions": "0+",
-      "about": "The names of the groups to describe" }
+      "about": "The names of the groups to describe" },
+    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
+      "about": "Whether to include authorized operations." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
index c5fcd82..f4677a7 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
@@ -19,7 +19,8 @@
   "name": "DescribeGroupsResponse",
   // Version 1 added throttle time.
   // Starting in version 2, on quota violation, brokers send out responses before throttling.
-  "validVersions": "0-2",
+  // Starting in version 3, brokers can send authorized operations.
+  "validVersions": "0-3",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@@ -51,7 +52,9 @@
         // This is currently only provided if the group is in the Stable state.
         { "name": "MemberAssignment", "type": "bytes", "versions": "0+",
           "about": "The current assignment provided by the group leader." }
-      ]}
+      ]},
+      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+",
+        "about": "32-bit bitfield to represent authorized operations for this group." }
     ]}
   ]
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index debb3a8..782dc16 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
@@ -1055,7 +1056,7 @@ public class KafkaAdminClientTest {
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
-            final Map<String, DescribeGroupsResponse.GroupMetadata> groupMetadataMap = new HashMap<>();
+            DescribeGroupsResponseData data = new DescribeGroupsResponseData();
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
             TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
             TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@@ -1066,29 +1067,34 @@ public class KafkaAdminClientTest {
             topicPartitions.add(2, myTopicPartition2);
 
             final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
+            byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
+            memberAssignment.get(memberAssignmentBytes);
 
-            groupMetadataMap.put(
-                "group-0",
-                new DescribeGroupsResponse.GroupMetadata(
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                    "group-0",
                     Errors.NONE,
                     "",
                     ConsumerProtocol.PROTOCOL_TYPE,
                     "",
                     asList(
-                        new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
-                        new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
-            groupMetadataMap.put(
-                "group-connect-0",
-                new DescribeGroupsResponse.GroupMetadata(
+                        DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
+                        DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
+                    ),
+                    Collections.emptySet()));
+
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                    "group-connect-0",
                     Errors.NONE,
                     "",
                     "connect",
                     "",
                     asList(
-                        new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
-                        new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
+                        DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
+                        DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
+                    ),
+                    Collections.emptySet()));
 
-            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
 
             final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
             final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c105d3b..f1e2063 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
@@ -767,18 +769,21 @@ public class RequestResponseTest {
     }
 
     private DescribeGroupsRequest createDescribeGroupRequest() {
-        return new DescribeGroupsRequest.Builder(singletonList("test-group")).build();
+        return new DescribeGroupsRequest.Builder(
+            new DescribeGroupsRequestData().
+                setGroups(Collections.singletonList("test-group"))).build();
     }
 
     private DescribeGroupsResponse createDescribeGroupResponse() {
         String clientId = "consumer-1";
         String clientHost = "localhost";
-        ByteBuffer empty = ByteBuffer.allocate(0);
-        DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
-                clientId, clientHost, empty, empty);
-        DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE,
-                "STABLE", "consumer", "roundrobin", asList(member));
-        return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
+        DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
+        DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId",
+                clientId, clientHost, new byte[0], new byte[0]);
+        DescribeGroupsResponseData.DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group", Errors.NONE,
+                "STABLE", "consumer", "roundrobin", asList(member), Collections.emptySet());
+        describeGroupsResponseData.groups().add(metadata);
+        return new DescribeGroupsResponse(describeGroupsResponseData);
     }
 
     private LeaveGroupRequest createLeaveGroupRequest() {
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index d5029b6..172a992 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -30,16 +30,19 @@ import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.formatAddress;
 import static org.apache.kafka.common.utils.Utils.formatBytes;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.common.utils.Utils.validHostPattern;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -110,8 +113,8 @@ public class UtilsTest {
     @Test
     public void testJoin() {
         assertEquals("", Utils.join(Collections.emptyList(), ","));
-        assertEquals("1", Utils.join(Arrays.asList("1"), ","));
-        assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ","));
+        assertEquals("1", Utils.join(asList("1"), ","));
+        assertEquals("1,2,3", Utils.join(asList(1, 2, 3), ","));
     }
 
     @Test
@@ -466,4 +469,22 @@ public class UtilsTest {
         Utils.delete(tempDir);
         assertFalse(Files.exists(tempDir.toPath()));
     }
+
+    @Test
+    public void testConvertTo32BitField() {
+        Set<Byte> bytes = mkSet((byte) 0, (byte) 1, (byte) 5, (byte) 10, (byte) 31);
+        int bitField = Utils.to32BitField(bytes);
+        assertEquals(bytes, Utils.from32BitField(bitField));
+
+        bytes = new HashSet<>();
+        bitField = Utils.to32BitField(bytes);
+        assertEquals(bytes, Utils.from32BitField(bitField));
+
+        bytes = mkSet((byte) 0, (byte) 11, (byte) 32);
+        try {
+            Utils.to32BitField(bytes);
+            fail("Expected exception not thrown");
+        } catch (IllegalArgumentException e) {
+        }
+    }
 }
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 51177b2..f945b25 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -41,14 +41,6 @@ object AclCommand extends Logging {
 
   private val Newline = scala.util.Properties.lineSeparator
 
-  val ResourceTypeToValidOperations: Map[JResourceType, Set[Operation]] = Map[JResourceType, Set[Operation]](
-    JResourceType.TOPIC -> Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs, All),
-    JResourceType.GROUP -> Set(Read, Describe, Delete, All),
-    JResourceType.CLUSTER -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
-    JResourceType.TRANSACTIONAL_ID -> Set(Describe, Write, All),
-    JResourceType.DELEGATION_TOKEN -> Set(Describe, All)
-  )
-
   def main(args: Array[String]) {
 
     val opts = new AclCommandOptions(args)
@@ -454,7 +446,7 @@ object AclCommand extends Logging {
 
   private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourcePatternFilter, Set[Acl]]): Unit = {
     for ((resource, acls) <- resourceToAcls) {
-      val validOps = ResourceTypeToValidOperations(resource.resourceType)
+      val validOps = ResourceType.fromJava(resource.resourceType).supportedOperations + All
       if ((acls.map(_.operation) -- validOps).nonEmpty)
         CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.mkString(",")}")
     }
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index aa5dfec..bd09db4 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -27,15 +27,15 @@ import org.apache.kafka.common.config.ConfigDef.ValidString._
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
+import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.Selector
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
-import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
@@ -233,20 +233,20 @@ class AdminClient(val time: Time,
                                   consumers: Option[List[ConsumerSummary]],
                                   coordinator: Node)
 
-  def describeConsumerGroupHandler(coordinator: Node, groupId: String): GroupMetadata = {
+  def describeConsumerGroupHandler(coordinator: Node, groupId: String): DescribeGroupsResponseData.DescribedGroup = {
     val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS,
-        new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)))
+      new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(Collections.singletonList(groupId))))
     val response = responseBody.asInstanceOf[DescribeGroupsResponse]
-    val metadata = response.groups.get(groupId)
-    if (metadata == null)
-      throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
+    val metadata = response.data().groups().asScala.find(group => groupId.equals(group.groupId()))
+      .getOrElse(throw new KafkaException(s"Response from broker contained no metadata for group $groupId"))
     metadata
   }
 
   def describeConsumerGroup(groupId: String, timeoutMs: Long = 0): ConsumerGroupSummary = {
 
-    def isValidConsumerGroupResponse(metadata: DescribeGroupsResponse.GroupMetadata): Boolean =
-      metadata.error == Errors.NONE && (metadata.state == "Dead" || metadata.state == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+    def isValidConsumerGroupResponse(metadata: DescribeGroupsResponseData.DescribedGroup): Boolean =
+      metadata.errorCode() == Errors.NONE.code() && (metadata.groupState() == "Dead" ||
+        metadata.groupState() == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
 
     val startTime = time.milliseconds
     val coordinator = findCoordinator(groupId, timeoutMs)
@@ -262,16 +262,16 @@ class AdminClient(val time: Time,
       throw new TimeoutException("The consumer group command timed out while waiting for group to initialize")
 
     val consumers = metadata.members.asScala.map { consumer =>
-      ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
+      ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.groupState() match {
         case "Stable" =>
-          val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
+          val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(consumer.memberAssignment))
           assignment.partitions.asScala.toList
         case _ =>
           List()
       })
     }.toList
 
-    ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
+    ConsumerGroupSummary(metadata.groupState(), metadata.protocolData(), Some(consumers), coordinator)
   }
 
   def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index 65a0373..ae55c47 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -23,6 +23,8 @@ import org.apache.kafka.common.resource.{ResourceType => JResourceType}
 sealed trait ResourceType extends BaseEnum with Ordered[ ResourceType ] {
   def error: Errors
   def toJava: JResourceType
+  // this method output will not include "All" Operation type
+  def supportedOperations: Set[Operation]
 
   override def compare(that: ResourceType): Int = this.name compare that.name
 }
@@ -31,30 +33,35 @@ case object Topic extends ResourceType {
   val name = "Topic"
   val error = Errors.TOPIC_AUTHORIZATION_FAILED
   val toJava = JResourceType.TOPIC
+  val supportedOperations = Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs)
 }
 
 case object Group extends ResourceType {
   val name = "Group"
   val error = Errors.GROUP_AUTHORIZATION_FAILED
   val toJava = JResourceType.GROUP
+  val supportedOperations = Set(Read, Describe, Delete)
 }
 
 case object Cluster extends ResourceType {
   val name = "Cluster"
   val error = Errors.CLUSTER_AUTHORIZATION_FAILED
   val toJava = JResourceType.CLUSTER
+  val supportedOperations = Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe)
 }
 
 case object TransactionalId extends ResourceType {
   val name = "TransactionalId"
   val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
   val toJava = JResourceType.TRANSACTIONAL_ID
+  val supportedOperations = Set(Describe, Write)
 }
 
 case object DelegationToken extends ResourceType {
   val name = "DelegationToken"
   val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
   val toJava = JResourceType.DELEGATION_TOKEN
+  val supportedOperations : Set[Operation] = Set(Describe)
 }
 
 object ResourceType {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bdd794c..2361ee5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.lang.{Long => JLong}
+import java.lang.{Byte => JByte}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.ConcurrentHashMap
@@ -44,7 +45,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.CreateTopicsResponseData
+import org.apache.kafka.common.message.{CreateTopicsResponseData, DescribeGroupsResponseData}
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
 import org.apache.kafka.common.message.LeaveGroupResponseData
@@ -1186,24 +1187,65 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDescribeGroupRequest(request: RequestChannel.Request) {
+
+    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
+        new DescribeGroupsResponse(describeGroupsResponseData)
+      }
+      sendResponseMaybeThrottle(request, createResponse)
+    }
+
     val describeRequest = request.body[DescribeGroupsRequest]
+    val describeGroupsResponseData = new DescribeGroupsResponseData()
 
-    val groups = describeRequest.groupIds.asScala.map { groupId =>
-      if (!authorize(request.session, Describe, Resource(Group, groupId, LITERAL))) {
-        groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
+    describeRequest.data().groups().asScala.foreach { groupId =>
+      val resource = Resource(Group, groupId, LITERAL)
+      if (!authorize(request.session, Describe, resource)) {
+        describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
       } else {
         val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
         val members = summary.members.map { member =>
-          val metadata = ByteBuffer.wrap(member.metadata)
-          val assignment = ByteBuffer.wrap(member.assignment)
-          new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
+          new DescribeGroupsResponseData.DescribedGroupMember()
+            .setMemberId(member.memberId)
+            .setClientId(member.clientId)
+            .setClientHost(member.clientHost)
+            .setMemberAssignment(member.assignment)
+            .setMemberMetadata(member.assignment)
         }
-        groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType,
-          summary.protocol, members.asJava)
+
+        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
+          .setErrorCode(error.code())
+          .setGroupId(groupId)
+          .setGroupState(summary.state)
+          .setProtocolType(summary.protocolType)
+          .setProtocolData(summary.protocol)
+          .setMembers(members.asJava)
+
+        if (request.header.apiVersion >= 3) {
+          if (error == Errors.NONE && describeRequest.data().includeAuthorizedOperations()) {
+            describedGroup.setAuthorizedOperations(authorizedOperations(request.session, resource))
+          } else {
+            describedGroup.setAuthorizedOperations(0)
+          }
+        }
+
+        describeGroupsResponseData.groups().add(describedGroup)
       }
-    }.toMap
+    }
+
+    sendResponseCallback(describeGroupsResponseData)
+  }
+
+
+  private def authorizedOperations(session: RequestChannel.Session, resource: Resource): Int = {
+    val authorizedOps = authorizer match {
+      case None => resource.resourceType.supportedOperations
+      case Some(auth) => resource.resourceType.supportedOperations
+        .filter(operation => authorize(session, operation, resource))
+    }
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeGroupsResponse(requestThrottleMs, groups.asJava))
+    Utils.to32BitField(authorizedOps.map(operation => operation.toJava.code().asInstanceOf[JByte]).asJava)
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 96de860..eed8004 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -47,14 +47,14 @@ import org.junit.Assert._
 
 import scala.util.Random
 import scala.collection.JavaConverters._
-
 import kafka.zk.KafkaZkClient
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
-
 import java.lang.{Long => JLong}
 
+import kafka.security.auth.Group
+
 /**
  * An integration test of the KafkaAdminClient.
  *
@@ -1142,12 +1142,14 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
             !matching.isEmpty
           }, s"Expected to be able to list $testGroupId")
 
-          val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
+          val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
+            new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
           assertEquals(2, result.describedGroups().size())
 
           // Test that we can get information about the test consumer group.
           assertTrue(result.describedGroups().containsKey(testGroupId))
           val testGroupDescription = result.describedGroups().get(testGroupId).get()
+
           assertEquals(testGroupId, testGroupDescription.groupId())
           assertFalse(testGroupDescription.isSimpleConsumerGroup())
           assertEquals(1, testGroupDescription.members().size())
@@ -1157,14 +1159,19 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
           assertEquals(testNumPartitions, topicPartitions.size())
           assertEquals(testNumPartitions, topicPartitions.asScala.
             count(tp => tp.topic().equals(testTopicName)))
+          val expectedOperations = Group.supportedOperations
+            .map(operation => operation.toJava).asJava
+          assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
 
           // Test that the fake group is listed as dead.
           assertTrue(result.describedGroups().containsKey(fakeGroupId))
           val fakeGroupDescription = result.describedGroups().get(fakeGroupId).get()
+
           assertEquals(fakeGroupId, fakeGroupDescription.groupId())
           assertEquals(0, fakeGroupDescription.members().size())
           assertEquals("", fakeGroupDescription.partitionAssignor())
           assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
+          assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations())
 
           // Test that all() returns 2 results
           assertEquals(2, result.all().get().size())
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e0c2832..044f595 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -33,9 +33,8 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, LeaveGroupRequestData}
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
-import org.apache.kafka.common.message.LeaveGroupRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
@@ -163,7 +162,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error),
     ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
     ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
-    ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error),
+    ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => {
+      val errorCode = resp.data().groups().asScala.find(g => group.equals(g.groupId())).head.errorCode()
+      Errors.forCode(errorCode)
+    }),
     ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
     ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
     ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
@@ -318,7 +320,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createDescribeGroupsRequest = {
-    new DescribeGroupsRequest.Builder(List(group).asJava).build()
+    new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List(group).asJava)).build()
   }
 
   private def createOffsetCommitRequest = {
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
new file mode 100644
index 0000000..5e53592
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -0,0 +1,121 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+import java.util
+import java.util.Properties
+
+import kafka.security.auth.{Allow, Alter, Authorizer, ClusterAction, Group, Operation, PermissionType, SimpleAclAuthorizer, Acl => AuthAcl, Resource => AuthResource}
+import kafka.server.KafkaConfig
+import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, DescribeConsumerGroupsOptions}
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Assert.assertEquals
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
+  override val serverCount = 1
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+
+  var client: AdminClient = _
+  val group1 = "group1"
+  val group2 = "group2"
+  val group3 = "group3"
+
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+
+  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+
+  override def configureSecurityBeforeServersStart() {
+    val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
+    try {
+      authorizer.configure(this.configs.head.originals())
+      authorizer.addAcls(Set(clusterAcl(JaasTestUtils.KafkaServerPrincipalUnqualifiedName, Allow, ClusterAction),
+        clusterAcl(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, Allow, Alter)),
+        AuthResource.ClusterResource)
+    } finally {
+      authorizer.close()
+    }
+  }
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+  }
+
+  private def clusterAcl(userName: String, permissionType: PermissionType, operation: Operation): AuthAcl = {
+    new AuthAcl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName), permissionType,
+      AuthAcl.WildCardHost, operation)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (client != null)
+      Utils.closeQuietly(client, "AdminClient")
+    super.tearDown()
+    closeSasl()
+  }
+
+  val group1Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group1, PatternType.LITERAL),
+    new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, AclPermissionType.ALLOW))
+
+  val group2Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group2, PatternType.LITERAL),
+    new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+
+  val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group3, PatternType.LITERAL),
+    new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, AclPermissionType.ALLOW))
+
+  def createConfig(): Properties = {
+    val adminClientConfig = new Properties()
+    adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => adminClientConfig.put(key.asInstanceOf[String], value) }
+    adminClientConfig
+  }
+
+  @Test
+  def testConsumerGroupAuthorizedOperations(): Unit = {
+    client = AdminClient.create(createConfig())
+
+    val results = client.createAcls(List(group1Acl, group2Acl, group3Acl).asJava)
+    assertEquals(Set(group1Acl, group2Acl, group3Acl), results.values.keySet.asScala)
+    results.all.get
+
+    val describeConsumerGroupsResult = client.describeConsumerGroups(Seq(group1, group2, group3).asJava,
+      new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
+    assertEquals(3, describeConsumerGroupsResult.describedGroups().size())
+    val expectedOperations =  Group.supportedOperations
+      .map(operation => operation.toJava).asJava
+
+    val group1Description = describeConsumerGroupsResult.describedGroups().get(group1).get
+    assertEquals(expectedOperations, group1Description.authorizedOperations())
+
+    val group2Description = describeConsumerGroupsResult.describedGroups().get(group2).get
+    assertEquals(Set(AclOperation.DESCRIBE), group2Description.authorizedOperations().asScala.toSet)
+
+    val group3Description = describeConsumerGroupsResult.describedGroups().get(group3).get
+    assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE), group3Description.authorizedOperations().asScala.toSet)
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 2d82e7f..b0eb7cb 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,10 +24,9 @@ import kafka.security.auth._
 import kafka.utils.TestUtils
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{ElectPreferredLeadersRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
@@ -268,7 +267,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new SyncGroupRequest.Builder("test-sync-group", 1, "", Map[String, ByteBuffer]().asJava)
 
         case ApiKeys.DESCRIBE_GROUPS =>
-          new DescribeGroupsRequest.Builder(List("test-group").asJava)
+          new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List("test-group").asJava))
 
         case ApiKeys.LIST_GROUPS =>
           new ListGroupsRequest.Builder()
@@ -443,7 +442,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
       case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
       case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
-      case ApiKeys.DESCRIBE_GROUPS => new DescribeGroupsResponse(response).throttleTimeMs
+      case ApiKeys.DESCRIBE_GROUPS =>
+        new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
       case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
       case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
       case ApiKeys.CREATE_TOPICS =>


Mime
View raw message