kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6841: Support Prefixed ACLs (KIP-290) (#5117)
Date Wed, 06 Jun 2018 14:23:12 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3aa655  KAFKA-6841: Support Prefixed ACLs (KIP-290) (#5117)
b3aa655 is described below

commit b3aa655a70d92fedfebd32cf469a87b45766fc59
Author: Andy Coates <8012398+big-andy-coates@users.noreply.github.com>
AuthorDate: Wed Jun 6 15:22:57 2018 +0100

    KAFKA-6841: Support Prefixed ACLs (KIP-290) (#5117)
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Jun Rao <junrao@gmail.com>
    
    Co-authored-by: Piyush Vijay <pvijay@apple.com>
    Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
---
 checkstyle/import-control.xml                      |   3 +-
 .../apache/kafka/common/acl/AclBindingFilter.java  |   5 +-
 .../apache/kafka/common/protocol/CommonFields.java |   3 +
 .../apache/kafka/common/protocol/types/Field.java  |   3 +
 .../kafka/common/requests/CreateAclsRequest.java   |  33 +++-
 .../kafka/common/requests/CreateAclsResponse.java  |   4 +-
 .../kafka/common/requests/DeleteAclsRequest.java   |  32 +++-
 .../kafka/common/requests/DeleteAclsResponse.java  |  54 ++++++-
 .../kafka/common/requests/DescribeAclsRequest.java |  25 ++-
 .../common/requests/DescribeAclsResponse.java      |  49 +++++-
 .../apache/kafka/common/requests/RequestUtils.java |  21 ++-
 .../org/apache/kafka/common/resource/Resource.java |  57 +++++--
 .../kafka/common/resource/ResourceFilter.java      | 114 +++++++++++--
 .../kafka/common/resource/ResourceNameType.java    |  91 +++++++++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   9 +-
 .../apache/kafka/common/acl/AclBindingTest.java    |  25 +--
 .../common/requests/CreateAclsRequestTest.java     |  91 +++++++++++
 .../common/requests/DeleteAclsRequestTest.java     |  87 ++++++++++
 .../common/requests/DeleteAclsResponseTest.java    | 107 +++++++++++++
 .../common/requests/DescribeAclsRequestTest.java   |  96 +++++++++++
 .../common/requests/DescribeAclsResponseTest.java  |  86 ++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  20 +--
 .../kafka/common/resource/ResourceFilterTest.java  | 163 +++++++++++++++++++
 core/src/main/scala/kafka/admin/AclCommand.scala   | 141 +++++++++++------
 .../main/scala/kafka/security/SecurityUtils.scala  |   7 +-
 core/src/main/scala/kafka/security/auth/Acl.scala  |   2 +
 .../scala/kafka/security/auth/Authorizer.scala     |  65 ++++++--
 .../main/scala/kafka/security/auth/Resource.scala  |  41 +++--
 .../kafka/security/auth/ResourceNameType.scala     |  49 ++++++
 .../scala/kafka/security/auth/ResourceType.scala   |   4 +-
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 108 +++++++++----
 core/src/main/scala/kafka/server/KafkaApis.scala   |  70 ++++----
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  54 ++++---
 core/src/main/scala/kafka/zk/ZkData.scala          |  90 +++++++----
 .../kafka/api/AdminClientIntegrationTest.scala     |   4 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  34 ++--
 .../kafka/api/EndToEndAuthorizationTest.scala      |  55 +++++--
 .../api/SaslSslAdminClientIntegrationTest.scala    | 176 ++++++++++++++++++---
 .../scala/unit/kafka/admin/AclCommandTest.scala    |  77 ++++++---
 .../ZkNodeChangeNotificationListenerTest.scala     |  28 ++--
 .../security/auth/SimpleAclAuthorizerTest.scala    | 157 ++++++++++++++++--
 .../delegation/DelegationTokenManagerTest.scala    |   6 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   5 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   5 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  96 +++++------
 docs/security.html                                 |  25 +++
 docs/upgrade.html                                  |   8 +
 47 files changed, 2063 insertions(+), 422 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5549205..106ad0a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -111,6 +111,7 @@
       <allow pkg="org.apache.kafka.common.protocol.types" />
       <allow pkg="org.apache.kafka.common.record" />
       <allow pkg="org.apache.kafka.common.requests" />
+      <allow pkg="org.apache.kafka.common.resource" />
     </subpackage>
 
     <subpackage name="record">
@@ -146,7 +147,7 @@
     </subpackage>
 
     <subpackage name="utils">
-      <allow pkg="org.apache.kafka.common.metrics" />
+      <allow pkg="org.apache.kafka.common" />
     </subpackage>
   </subpackage>
 
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
index 64f16cd..5841b5a 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.acl;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.resource.ResourceFilter;
-import org.apache.kafka.common.resource.ResourceType;
 
 import java.util.Objects;
 
@@ -36,9 +35,7 @@ public class AclBindingFilter {
     /**
      * A filter which matches any ACL binding.
      */
-    public static final AclBindingFilter ANY = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
-        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
+    public static final AclBindingFilter ANY = new AclBindingFilter(ResourceFilter.ANY, AccessControlEntryFilter.ANY);
 
     /**
      * Create an instance of this filter with the provided parameters.
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index 7f43caf..96fa136 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.resource.ResourceNameType;
 
 public class CommonFields {
     public static final Field.Int32 THROTTLE_TIME_MS = new Field.Int32("throttle_time_ms",
@@ -45,6 +46,8 @@ public class CommonFields {
     public static final Field.Int8 RESOURCE_TYPE = new Field.Int8("resource_type", "The resource type");
     public static final Field.Str RESOURCE_NAME = new Field.Str("resource_name", "The resource name");
     public static final Field.NullableStr RESOURCE_NAME_FILTER = new Field.NullableStr("resource_name", "The resource name filter");
+    public static final Field.Int8 RESOURCE_NAME_TYPE = new Field.Int8("resource_name_type", "The resource name type", ResourceNameType.LITERAL.code());
+    public static final Field.Int8 RESOURCE_NAME_TYPE_FILTER = new Field.Int8("resource_name_type_filter", "The resource name type filter", ResourceNameType.LITERAL.code());
     public static final Field.Str PRINCIPAL = new Field.Str("principal", "The ACL principal");
     public static final Field.NullableStr PRINCIPAL_FILTER = new Field.NullableStr("principal", "The ACL principal filter");
     public static final Field.Str HOST = new Field.Str("host", "The ACL host");
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index ec217f5..5c17001 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -50,6 +50,9 @@ public class Field {
         public Int8(String name, String docString) {
             super(name, Type.INT8, docString, false, null);
         }
+        public Int8(String name, String docString, byte defaultValue) {
+            super(name, Type.INT8, docString, true, defaultValue);
+        }
     }
 
     public static class Int32 extends Field {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index ef4cba2..00f65c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
@@ -36,6 +38,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class CreateAclsRequest extends AbstractRequest {
@@ -51,9 +54,20 @@ public class CreateAclsRequest extends AbstractRequest {
                     PERMISSION_TYPE))));
 
     /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     * Version 1 adds RESOURCE_NAME_TYPE.
+     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
+     *
+     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
      */
-    private static final Schema CREATE_ACLS_REQUEST_V1 = CREATE_ACLS_REQUEST_V0;
+    private static final Schema CREATE_ACLS_REQUEST_V1 = new Schema(
+            new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
+                    RESOURCE_TYPE,
+                    RESOURCE_NAME,
+                    RESOURCE_NAME_TYPE,
+                    PRINCIPAL,
+                    HOST,
+                    OPERATION,
+                    PERMISSION_TYPE))));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{CREATE_ACLS_REQUEST_V0, CREATE_ACLS_REQUEST_V1};
@@ -111,6 +125,8 @@ public class CreateAclsRequest extends AbstractRequest {
     CreateAclsRequest(short version, List<AclCreation> aclCreations) {
         super(version);
         this.aclCreations = aclCreations;
+
+        validate(aclCreations);
     }
 
     public CreateAclsRequest(Struct struct, short version) {
@@ -158,4 +174,17 @@ public class CreateAclsRequest extends AbstractRequest {
     public static CreateAclsRequest parse(ByteBuffer buffer, short version) {
         return new CreateAclsRequest(ApiKeys.CREATE_ACLS.parseRequest(version, buffer), version);
     }
+
+    private void validate(List<AclCreation> aclCreations) {
+        if (version() == 0) {
+            final boolean unsupported = aclCreations.stream()
+                .map(AclCreation::acl)
+                .map(AclBinding::resource)
+                .map(Resource::nameType)
+                .anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
+            if (unsupported) {
+                throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+            }
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 787ad7a..d5f52dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -43,7 +43,7 @@ public class CreateAclsResponse extends AbstractResponse {
                     ERROR_MESSAGE))));
 
     /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     * The version number is bumped to indicate that, on quota violation, brokers send out responses before throttling.
      */
     private static final Schema CREATE_ACLS_RESPONSE_V1 = CREATE_ACLS_RESPONSE_V0;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 7f53ab5..d896bb2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
@@ -37,6 +39,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class DeleteAclsRequest extends AbstractRequest {
@@ -52,9 +55,20 @@ public class DeleteAclsRequest extends AbstractRequest {
                     PERMISSION_TYPE))));
 
     /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     * V1 sees a new `RESOURCE_NAME_TYPE_FILTER` that controls how the filter handles different resource name types.
+     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
+     *
+     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
      */
-    private static final Schema DELETE_ACLS_REQUEST_V1 = DELETE_ACLS_REQUEST_V0;
+    private static final Schema DELETE_ACLS_REQUEST_V1 = new Schema(
+            new Field(FILTERS, new ArrayOf(new Schema(
+                    RESOURCE_TYPE,
+                    RESOURCE_NAME_FILTER,
+                    RESOURCE_NAME_TYPE_FILTER,
+                    PRINCIPAL_FILTER,
+                    HOST_FILTER,
+                    OPERATION,
+                    PERMISSION_TYPE))));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{DELETE_ACLS_REQUEST_V0, DELETE_ACLS_REQUEST_V1};
@@ -84,6 +98,8 @@ public class DeleteAclsRequest extends AbstractRequest {
     DeleteAclsRequest(short version, List<AclBindingFilter> filters) {
         super(version);
         this.filters = filters;
+
+        validate(version, filters);
     }
 
     public DeleteAclsRequest(Struct struct, short version) {
@@ -136,4 +152,16 @@ public class DeleteAclsRequest extends AbstractRequest {
     public static DeleteAclsRequest parse(ByteBuffer buffer, short version) {
         return new DeleteAclsRequest(DELETE_ACLS.parseRequest(version, buffer), version);
     }
+
+    private void validate(short version, List<AclBindingFilter> filters) {
+        if (version == 0) {
+            final boolean unsupported = filters.stream()
+                .map(AclBindingFilter::resourceFilter)
+                .map(ResourceFilter::nameType)
+                .anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
+            if (unsupported) {
+                throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+            }
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 58dbb93..1790457 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -18,13 +18,15 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +45,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 
@@ -51,7 +54,7 @@ public class DeleteAclsResponse extends AbstractResponse {
     private final static String FILTER_RESPONSES_KEY_NAME = "filter_responses";
     private final static String MATCHING_ACLS_KEY_NAME = "matching_acls";
 
-    private static final Schema MATCHING_ACL = new Schema(
+    private static final Schema MATCHING_ACL_V0 = new Schema(
             ERROR_CODE,
             ERROR_MESSAGE,
             RESOURCE_TYPE,
@@ -61,18 +64,43 @@ public class DeleteAclsResponse extends AbstractResponse {
             OPERATION,
             PERMISSION_TYPE);
 
+    /**
+     * V1 sees a new `RESOURCE_NAME_TYPE` that describes how the resource name is interpreted.
+     *
+     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     */
+    private static final Schema MATCHING_ACL_V1 = new Schema(
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            RESOURCE_TYPE,
+            RESOURCE_NAME,
+            RESOURCE_NAME_TYPE,
+            PRINCIPAL,
+            HOST,
+            OPERATION,
+            PERMISSION_TYPE);
+
     private static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
             new Field(FILTER_RESPONSES_KEY_NAME,
                     new ArrayOf(new Schema(
                             ERROR_CODE,
                             ERROR_MESSAGE,
-                            new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL), "The matching ACLs")))));
+                            new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL_V0), "The matching ACLs")))));
 
     /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     * V1 sees a new `RESOURCE_NAME_TYPE` field added to MATCHING_ACL_V1, that describes how the resource name is interpreted
+     * and version was bumped to indicate that, on quota violation, brokers send out responses before throttling.
+     *
+     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
      */
-    private static final Schema DELETE_ACLS_RESPONSE_V1 = DELETE_ACLS_RESPONSE_V0;
+    private static final Schema DELETE_ACLS_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(FILTER_RESPONSES_KEY_NAME,
+                    new ArrayOf(new Schema(
+                            ERROR_CODE,
+                            ERROR_MESSAGE,
+                            new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL_V1), "The matching ACLs")))));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{DELETE_ACLS_RESPONSE_V0, DELETE_ACLS_RESPONSE_V1};
@@ -161,6 +189,8 @@ public class DeleteAclsResponse extends AbstractResponse {
 
     @Override
     protected Struct toStruct(short version) {
+        validate(version);
+
         Struct struct = new Struct(ApiKeys.DELETE_ACLS.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> responseStructs = new ArrayList<>();
@@ -211,4 +241,18 @@ public class DeleteAclsResponse extends AbstractResponse {
     public boolean shouldClientThrottle(short version) {
         return version >= 1;
     }
+
+    private void validate(short version) {
+        if (version == 0) {
+            final boolean unsupported = responses.stream()
+                .flatMap(r -> r.deletions.stream())
+                .map(AclDeletionResult::acl)
+                .map(AclBinding::resource)
+                .map(Resource::nameType)
+                .anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
+            if (unsupported) {
+                throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+            }
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 4d6ec60..d3a04d0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -19,10 +19,12 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceNameType;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -32,6 +34,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class DescribeAclsRequest extends AbstractRequest {
@@ -44,9 +47,19 @@ public class DescribeAclsRequest extends AbstractRequest {
             PERMISSION_TYPE);
 
     /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     * V1 sees a new `RESOURCE_NAME_TYPE_FILTER` that controls how the filter handles different resource name types.
+     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
+     *
+     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
      */
-    private static final Schema DESCRIBE_ACLS_REQUEST_V1 = DESCRIBE_ACLS_REQUEST_V0;
+    private static final Schema DESCRIBE_ACLS_REQUEST_V1 = new Schema(
+            RESOURCE_TYPE,
+            RESOURCE_NAME_FILTER,
+            RESOURCE_NAME_TYPE_FILTER,
+            PRINCIPAL_FILTER,
+            HOST_FILTER,
+            OPERATION,
+            PERMISSION_TYPE);
 
     public static Schema[] schemaVersions() {
         return new Schema[]{DESCRIBE_ACLS_REQUEST_V0, DESCRIBE_ACLS_REQUEST_V1};
@@ -76,6 +89,8 @@ public class DescribeAclsRequest extends AbstractRequest {
     DescribeAclsRequest(AclBindingFilter filter, short version) {
         super(version);
         this.filter = filter;
+
+        validate(filter, version);
     }
 
     public DescribeAclsRequest(Struct struct, short version) {
@@ -114,4 +129,10 @@ public class DescribeAclsRequest extends AbstractRequest {
     public AclBindingFilter filter() {
         return filter;
     }
+
+    private void validate(AclBindingFilter filter, short version) {
+        if (version == 0 && filter.resourceFilter().nameType() != ResourceNameType.LITERAL) {
+            throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index c705c71..b6673d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -19,13 +19,15 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceNameType;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -41,6 +43,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 
@@ -48,9 +51,24 @@ public class DescribeAclsResponse extends AbstractResponse {
     private final static String RESOURCES_KEY_NAME = "resources";
     private final static String ACLS_KEY_NAME = "acls";
 
-    private static final Schema DESCRIBE_ACLS_RESOURCE = new Schema(
+    private static final Schema DESCRIBE_ACLS_RESOURCE_V0 = new Schema(
+            RESOURCE_TYPE,
+            RESOURCE_NAME,
+            new Field(ACLS_KEY_NAME, new ArrayOf(new Schema(
+                    PRINCIPAL,
+                    HOST,
+                    OPERATION,
+                    PERMISSION_TYPE))));
+
+    /**
+     * V1 sees a new `RESOURCE_NAME_TYPE` that describes how the resource name is interpreted.
+     *
+     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     */
+    private static final Schema DESCRIBE_ACLS_RESOURCE_V1 = new Schema(
             RESOURCE_TYPE,
             RESOURCE_NAME,
+            RESOURCE_NAME_TYPE,
             new Field(ACLS_KEY_NAME, new ArrayOf(new Schema(
                     PRINCIPAL,
                     HOST,
@@ -61,12 +79,19 @@ public class DescribeAclsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             ERROR_CODE,
             ERROR_MESSAGE,
-            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_ACLS_RESOURCE), "The resources and their associated ACLs."));
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_ACLS_RESOURCE_V0), "The resources and their associated ACLs."));
 
     /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+     * V1 sees a new `RESOURCE_NAME_TYPE` field added to DESCRIBE_ACLS_RESOURCE_V1, that describes how the resource name is interpreted
+     * and version was bumped to indicate that, on quota violation, brokers send out responses before throttling.
+     *
+     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
      */
-    private static final Schema DESCRIBE_ACLS_RESPONSE_V1 = DESCRIBE_ACLS_RESPONSE_V0;
+    private static final Schema DESCRIBE_ACLS_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_ACLS_RESOURCE_V1), "The resources and their associated ACLs."));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{DESCRIBE_ACLS_RESPONSE_V0, DESCRIBE_ACLS_RESPONSE_V1};
@@ -99,6 +124,8 @@ public class DescribeAclsResponse extends AbstractResponse {
 
     @Override
     protected Struct toStruct(short version) {
+        validate(version);
+
         Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         error.write(struct);
@@ -157,4 +184,16 @@ public class DescribeAclsResponse extends AbstractResponse {
     public boolean shouldClientThrottle(short version) {
         return version >= 1;
     }
+
+    private void validate(short version) {
+        if (version == 0) {
+            final boolean unsupported = acls.stream()
+                .map(AclBinding::resource)
+                .map(Resource::nameType)
+                .anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
+            if (unsupported) {
+                throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+            }
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index a1c27b7..f4f00a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.resource.ResourceNameType;
 
 import static org.apache.kafka.common.protocol.CommonFields.HOST;
 import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
@@ -33,6 +34,8 @@ import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 final class RequestUtils {
@@ -42,23 +45,37 @@ final class RequestUtils {
     static Resource resourceFromStructFields(Struct struct) {
         byte resourceType = struct.get(RESOURCE_TYPE);
         String name = struct.get(RESOURCE_NAME);
-        return new Resource(ResourceType.fromCode(resourceType), name);
+        ResourceNameType resourceNameType = ResourceNameType.LITERAL;
+        if (struct.hasField(RESOURCE_NAME_TYPE)) {
+            resourceNameType = ResourceNameType.fromCode(struct.get(RESOURCE_NAME_TYPE));
+        }
+        return new Resource(ResourceType.fromCode(resourceType), name, resourceNameType);
     }
 
     static void resourceSetStructFields(Resource resource, Struct struct) {
         struct.set(RESOURCE_TYPE, resource.resourceType().code());
         struct.set(RESOURCE_NAME, resource.name());
+        if (struct.hasField(RESOURCE_NAME_TYPE)) {
+            struct.set(RESOURCE_NAME_TYPE, resource.nameType().code());
+        }
     }
 
     static ResourceFilter resourceFilterFromStructFields(Struct struct) {
         byte resourceType = struct.get(RESOURCE_TYPE);
         String name = struct.get(RESOURCE_NAME_FILTER);
-        return new ResourceFilter(ResourceType.fromCode(resourceType), name);
+        ResourceNameType resourceNameType = ResourceNameType.LITERAL;
+        if (struct.hasField(RESOURCE_NAME_TYPE_FILTER)) {
+            resourceNameType = ResourceNameType.fromCode(struct.get(RESOURCE_NAME_TYPE_FILTER));
+        }
+        return new ResourceFilter(ResourceType.fromCode(resourceType), name, resourceNameType);
     }
 
     static void resourceFilterSetStructFields(ResourceFilter resourceFilter, Struct struct) {
         struct.set(RESOURCE_TYPE, resourceFilter.resourceType().code());
         struct.set(RESOURCE_NAME_FILTER, resourceFilter.name());
+        if (struct.hasField(RESOURCE_NAME_TYPE_FILTER)) {
+            struct.set(RESOURCE_NAME_TYPE_FILTER, resourceFilter.nameType().code());
+        }
     }
 
     static AccessControlEntry aceFromStructFields(Struct struct) {
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
index f41f41a..a4810b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
@@ -22,14 +22,20 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Objects;
 
 /**
- * Represents a cluster resource with a tuple of (type, name).
+ * Represents a cluster resource with a tuple of (type, name, nameType).
  *
  * The API for this class is still evolving and we may break compatibility in minor releases, if necessary.
  */
 @InterfaceStability.Evolving
 public class Resource {
+    /**
+     * A special literal resource name that corresponds to 'all resources of a certain type'.
+     */
+    public static final String WILDCARD_RESOURCE = "*";
+
     private final ResourceType resourceType;
     private final String name;
+    private final ResourceNameType nameType;
 
     /**
      * The name of the CLUSTER resource.
@@ -39,19 +45,32 @@ public class Resource {
     /**
      * A resource representing the whole cluster.
      */
-    public final static Resource CLUSTER = new Resource(ResourceType.CLUSTER, CLUSTER_NAME);
+    public final static Resource CLUSTER = new Resource(ResourceType.CLUSTER, CLUSTER_NAME, ResourceNameType.LITERAL);
+
+    /**
+     * Create an instance of this class with the provided parameters.
+     *
+     * @param resourceType non-null resource type
+     * @param name non-null resource name
+     * @param nameType non-null resource name type
+     */
+    public Resource(ResourceType resourceType, String name, ResourceNameType nameType) {
+        this.resourceType = Objects.requireNonNull(resourceType, "resourceType");
+        this.name = Objects.requireNonNull(name, "name");
+        this.nameType = Objects.requireNonNull(nameType, "nameType");
+    }
 
     /**
      * Create an instance of this class with the provided parameters.
+     * Resource name type would default to ResourceNameType.LITERAL.
      *
      * @param resourceType non-null resource type
      * @param name non-null resource name
+     * @deprecated Since 2.0. Use {@link #Resource(ResourceType, String, ResourceNameType)}
      */
+    @Deprecated
     public Resource(ResourceType resourceType, String name) {
-        Objects.requireNonNull(resourceType);
-        this.resourceType = resourceType;
-        Objects.requireNonNull(name);
-        this.name = name;
+        this(resourceType, name, ResourceNameType.LITERAL);
     }
 
     /**
@@ -62,6 +81,13 @@ public class Resource {
     }
 
     /**
+     * Return the resource name type.
+     */
+    public ResourceNameType nameType() {
+        return nameType;
+    }
+
+    /**
      * Return the resource name.
      */
     public String name() {
@@ -72,31 +98,36 @@ public class Resource {
      * Create a filter which matches only this Resource.
      */
     public ResourceFilter toFilter() {
-        return new ResourceFilter(resourceType, name);
+        return new ResourceFilter(resourceType, name, nameType);
     }
 
     @Override
     public String toString() {
-        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")";
+        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", nameType=" + nameType + ")";
     }
 
     /**
      * Return true if this Resource has any UNKNOWN components.
      */
     public boolean isUnknown() {
-        return resourceType.isUnknown();
+        return resourceType.isUnknown() || nameType.isUnknown();
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof Resource))
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
             return false;
-        Resource other = (Resource) o;
-        return resourceType.equals(other.resourceType) && Objects.equals(name, other.name);
+
+        final Resource resource = (Resource) o;
+        return resourceType == resource.resourceType &&
+            Objects.equals(name, resource.name) &&
+            nameType == resource.nameType;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(resourceType, name);
+        return Objects.hash(resourceType, name, nameType);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 0a4611f..e197e91 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Objects;
 
+import static org.apache.kafka.common.resource.Resource.WILDCARD_RESOURCE;
+
 /**
  * A filter which matches Resource objects.
  *
@@ -30,22 +32,54 @@ import java.util.Objects;
 public class ResourceFilter {
     private final ResourceType resourceType;
     private final String name;
+    private final ResourceNameType nameType;
 
     /**
      * Matches any resource.
      */
-    public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null);
+    public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null, ResourceNameType.ANY);
 
     /**
-     * Create an instance of this class with the provided parameters.
+     * Create a filter that matches {@link ResourceNameType#LITERAL literal} resources of the
+     * supplied {@code resourceType} and {@code name}.
      *
      * @param resourceType non-null resource type
-     * @param name resource name or null
+     * @param name resource name or {@code null}.
+     *             If {@code null}, the filter will ignore the name of resources.
+     * @deprecated Since 2.0. Use {@link #ResourceFilter(ResourceType, String, ResourceNameType)}
      */
+    @Deprecated
     public ResourceFilter(ResourceType resourceType, String name) {
-        Objects.requireNonNull(resourceType);
-        this.resourceType = resourceType;
+        this(resourceType, name, ResourceNameType.LITERAL);
+    }
+
+    /**
+     * Create a filter that matches resources of the supplied {@code resourceType}, {@code name} and
+     * {@code nameType}.
+     * <p>
+     * If the filter has each three parameters fully supplied, then it will only match a resource that has exactly
+     * the same values, e.g. a filter of {@code new ResourceFilter(ResourceType.GROUP, "one", ResourceTypeName.PREFIXED)}
+     * will only match the resource {@code new Resource(ResourceType.GROUP, "one", ResourceTypeName.PREFIXED)}.
+     * <p>
+     * Any of the three parameters can be set to be ignored by the filter:
+     * <ul>
+     *     <li><b>{@code resourceType}</b> can be set to {@link ResourceType#ANY},
+     *     meaning it will match a resource of any resource type</li>
+     *     <li><b>{@code name}</b> can be set to {@code null}, meaning it will match a resource of any name.</li>
+     *     <li><b>{@code nameType}</b> can be set to {@link ResourceNameType#ANY},
+     *     meaning it will match a resource with any resource name type, including the
+     *     {@link Resource#WILDCARD_RESOURCE wildcard resource}</li>
+     * </ul>
+     *
+     * @param resourceType non-null resource type to filter by.
+     * @param name resource name to filter by, or {@code null}.
+     *             If {@code null}, the filter will ignore the name of resources.
+     * @param nameType non-null resource name type to filter by.
+     */
+    public ResourceFilter(ResourceType resourceType, String name, ResourceNameType nameType) {
+        this.resourceType = Objects.requireNonNull(resourceType, "resourceType");
         this.name = name;
+        this.nameType = Objects.requireNonNull(nameType, "nameType");
     }
 
     /**
@@ -62,40 +96,76 @@ public class ResourceFilter {
         return name;
     }
 
+    /**
+     * Return the resource name type.
+     */
+    public ResourceNameType nameType() {
+        return nameType;
+    }
+
     @Override
     public String toString() {
-        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")";
+        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", nameType=" + nameType + ")";
     }
 
     /**
      * Return true if this ResourceFilter has any UNKNOWN components.
      */
     public boolean isUnknown() {
-        return resourceType.isUnknown();
+        return resourceType.isUnknown() || nameType.isUnknown();
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof ResourceFilter))
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
             return false;
-        ResourceFilter other = (ResourceFilter) o;
-        return resourceType.equals(other.resourceType) && Objects.equals(name, other.name);
+
+        final ResourceFilter that = (ResourceFilter) o;
+        return resourceType == that.resourceType &&
+            Objects.equals(name, that.name) &&
+            nameType == that.nameType;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(resourceType, name);
+        return Objects.hash(resourceType, name, nameType);
     }
 
     /**
      * Return true if this filter matches the given Resource.
+     * @param other the resource path under which ACLs are stored.
      */
-    public boolean matches(Resource other) {
-        if ((name != null) && (!name.equals(other.name())))
+    public boolean matches(final Resource other) {
+        throwOnInvalidParams(other);
+
+        if (!resourceType().equals(ResourceType.ANY) && !resourceType().equals(other.resourceType())) {
             return false;
-        if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType())))
+        }
+
+        if (!nameType().equals(ResourceNameType.ANY) && !nameType().equals(other.nameType())) {
             return false;
-        return true;
+        }
+
+        if (name() == null) {
+            return true;
+        }
+
+        if (nameType().equals(other.nameType())) {
+            return other.name().equals(name());
+        }
+
+        switch (other.nameType()) {
+            case LITERAL:
+                return other.name().equals(name()) || other.name().equals(WILDCARD_RESOURCE);
+
+            case PREFIXED:
+                return name().startsWith(other.name());
+
+            default:
+                throw new IllegalArgumentException("Unsupported ResourceNameType: " + other.nameType());
+        }
     }
 
     /**
@@ -115,6 +185,20 @@ public class ResourceFilter {
             return "Resource type is UNKNOWN.";
         if (name == null)
             return "Resource name is NULL.";
+        if (nameType == ResourceNameType.ANY)
+            return "Resource name type is ANY.";
+        if (nameType == ResourceNameType.UNKNOWN)
+            return "Resource name type is UNKNOWN.";
         return null;
     }
+
+    private static void throwOnInvalidParams(final Resource aclPath) {
+        if (aclPath.resourceType().equals(ResourceType.ANY) || aclPath.resourceType().equals(ResourceType.UNKNOWN)) {
+            throw new IllegalArgumentException("Only concrete resource types are supported. Got: " + aclPath.resourceType());
+        }
+
+        if (aclPath.nameType().equals(ResourceNameType.ANY) || aclPath.nameType().equals(ResourceNameType.UNKNOWN)) {
+            throw new IllegalArgumentException("Only concrete resource name types are supported. Got: " + aclPath.nameType());
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
new file mode 100644
index 0000000..7aa7217
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.resource;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Resource name type.
+ */
+@InterfaceStability.Evolving
+public enum ResourceNameType {
+    /**
+     * Represents any ResourceNameType which this client cannot understand, perhaps because this client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any resource name type.
+     */
+    ANY((byte) 1),
+
+    /**
+     * A literal resource name.
+     *
+     * A literal name defines the full name of a resource, e.g. topic with name 'foo', or group with name 'bob'.
+     *
+     * The special wildcard character {@code *} can be used to represent a resource with any name.
+     */
+    LITERAL((byte) 2),
+
+    /**
+     * A prefixed resource name.
+     *
+     * A prefixed name defines a prefix for a resource, e.g. topics with names that start with 'foo'.
+     */
+    PREFIXED((byte) 3);
+
+    private final static Map<Byte, ResourceNameType> CODE_TO_VALUE =
+        Collections.unmodifiableMap(
+            Arrays.stream(ResourceNameType.values())
+                .collect(Collectors.toMap(ResourceNameType::code, Function.identity()))
+        );
+
+    private final byte code;
+
+    ResourceNameType(byte code) {
+        this.code = code;
+    }
+
+    /**
+     * @return the code of this resource.
+     */
+    public byte code() {
+        return code;
+    }
+
+    /**
+     * Return whether this resource name type is UNKNOWN.
+     */
+    public boolean isUnknown() {
+        return this == UNKNOWN;
+    }
+
+    /**
+     * Return the ResourceNameType with the provided code or {@link #UNKNOWN} if one cannot be found.
+     */
+    public static ResourceNameType fromCode(byte code) {
+        return CODE_TO_VALUE.getOrDefault(code, UNKNOWN);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c4af2c7..3a300db 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -67,6 +67,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -536,13 +537,13 @@ public class KafkaAdminClientTest {
         }
     }
 
-    private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+    private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
-    private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),
+    private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4", ResourceNameType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY));
-    private static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null),
+    private static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null, ResourceNameType.LITERAL),
         new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY));
-    private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null),
+    private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null, ResourceNameType.LITERAL),
         new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY));
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
index 0ebcdfe..6110c48 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -18,47 +18,48 @@ package org.apache.kafka.common.acl;
 
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class AclBindingTest {
     private static final AclBinding ACL1 = new AclBinding(
-        new Resource(ResourceType.TOPIC, "mytopic"),
+        new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
 
     private static final AclBinding ACL2 = new AclBinding(
-        new Resource(ResourceType.TOPIC, "mytopic"),
+        new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
         new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW));
 
     private static final AclBinding ACL3 = new AclBinding(
-        new Resource(ResourceType.TOPIC, "mytopic2"),
+        new Resource(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
     private static final AclBinding UNKNOWN_ACL = new AclBinding(
-        new Resource(ResourceType.TOPIC, "mytopic2"),
+        new Resource(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY));
 
     private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
+        ResourceFilter.ANY,
         new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY));
 
     private static final AclBindingFilter ANY_DENY = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
+        ResourceFilter.ANY,
         new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY));
 
     private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter(
-        new ResourceFilter(ResourceType.TOPIC, "mytopic"),
+        new ResourceFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
         new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
 
     @Test
     public void testMatching() throws Exception {
         assertTrue(ACL1.equals(ACL1));
         final AclBinding acl1Copy = new AclBinding(
-            new Resource(ResourceType.TOPIC, "mytopic"),
+            new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
         assertTrue(ACL1.equals(acl1Copy));
         assertTrue(acl1Copy.equals(ACL1));
@@ -103,9 +104,9 @@ public class AclBindingTest {
 
     @Test
     public void testMatchesAtMostOne() throws Exception {
-        assertEquals(null, ACL1.toFilter().findIndefiniteField());
-        assertEquals(null, ACL2.toFilter().findIndefiniteField());
-        assertEquals(null, ACL3.toFilter().findIndefiniteField());
+        assertNull(ACL1.toFilter().findIndefiniteField());
+        assertNull(ACL2.toFilter().findIndefiniteField());
+        assertNull(ACL3.toFilter().findIndefiniteField());
         assertFalse(ANY_ANONYMOUS.matchesAtMostOne());
         assertFalse(ANY_DENY.matchesAtMostOne());
         assertFalse(ANY_MYTOPIC.matchesAtMostOne());
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
new file mode 100644
index 0000000..748914b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+public class CreateAclsRequestTest {
+    private static final short V0 = 0;
+    private static final short V1 = 1;
+
+    private static final AclBinding LITERAL_ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
+
+    private static final AclBinding LITERAL_ACL2 = new AclBinding(new Resource(ResourceType.GROUP, "group", ResourceNameType.LITERAL),
+        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
+
+    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new Resource(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void shouldThrowOnV0IfNotLiteral() {
+        new CreateAclsRequest(V0, aclCreations(PREFIXED_ACL1));
+    }
+
+    @Test
+    public void shouldRoundTripV0() {
+        final CreateAclsRequest original = new CreateAclsRequest(V0, aclCreations(LITERAL_ACL1, LITERAL_ACL2));
+        final Struct struct = original.toStruct();
+
+        final CreateAclsRequest result = new CreateAclsRequest(struct, V0);
+
+        assertRequestEquals(original, result);
+    }
+
+    @Test
+    public void shouldRoundTripV1() {
+        final CreateAclsRequest original = new CreateAclsRequest(V1, aclCreations(LITERAL_ACL1, PREFIXED_ACL1));
+        final Struct struct = original.toStruct();
+
+        final CreateAclsRequest result = new CreateAclsRequest(struct, V1);
+
+        assertRequestEquals(original, result);
+    }
+
+    private static void assertRequestEquals(final CreateAclsRequest original, final CreateAclsRequest actual) {
+        assertEquals("Number of Acls wrong", original.aclCreations().size(), actual.aclCreations().size());
+
+        for (int idx = 0; idx != original.aclCreations().size(); ++idx) {
+            final AclBinding originalBinding = original.aclCreations().get(idx).acl();
+            final AclBinding actualBinding = actual.aclCreations().get(idx).acl();
+            assertEquals(originalBinding, actualBinding);
+        }
+    }
+
+    private static List<AclCreation> aclCreations(final AclBinding... acls) {
+        return Arrays.stream(acls)
+            .map(AclCreation::new)
+            .collect(Collectors.toList());
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
new file mode 100644
index 0000000..7761337
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class DeleteAclsRequestTest {
+    private static final short V0 = 0;
+    private static final short V1 = 1;
+
+    private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+        new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
+
+    private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+        new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
+
+    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+        new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void shouldThrowOnV0IfNotLiteral() {
+        new DeleteAclsRequest(V0, aclFilters(PREFIXED_FILTER));
+    }
+
+    @Test
+    public void shouldRoundTripV0() {
+        final DeleteAclsRequest original = new DeleteAclsRequest(V0, aclFilters(LITERAL_FILTER));
+        final Struct struct = original.toStruct();
+
+        final DeleteAclsRequest result = new DeleteAclsRequest(struct, V0);
+
+        assertRequestEquals(original, result);
+    }
+
+    @Test
+    public void shouldRoundTripV1() {
+        final DeleteAclsRequest original = new DeleteAclsRequest(V1, aclFilters(LITERAL_FILTER, PREFIXED_FILTER, ANY_FILTER));
+        final Struct struct = original.toStruct();
+
+        final DeleteAclsRequest result = new DeleteAclsRequest(struct, V1);
+
+        assertRequestEquals(original, result);
+    }
+
+    private static void assertRequestEquals(final DeleteAclsRequest original, final DeleteAclsRequest actual) {
+        assertEquals("Number of filters wrong", original.filters().size(), actual.filters().size());
+
+        for (int idx = 0; idx != original.filters().size(); ++idx) {
+            final AclBindingFilter originalFilter = original.filters().get(idx);
+            final AclBindingFilter actualFilter = actual.filters().get(idx);
+            assertEquals(originalFilter, actualFilter);
+        }
+    }
+
+    private static List<AclBindingFilter> aclFilters(final AclBindingFilter... acls) {
+        return Arrays.asList(acls);
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
new file mode 100644
index 0000000..f8e9148
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
+import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+public class DeleteAclsResponseTest {
+    private static final short V0 = 0;
+    private static final short V1 = 1;
+
+    private static final AclBinding LITERAL_ACL1 = new AclBinding(new org.apache.kafka.common.resource.Resource(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
+
+    private static final AclBinding LITERAL_ACL2 = new AclBinding(new org.apache.kafka.common.resource.Resource(ResourceType.GROUP, "group", ResourceNameType.LITERAL),
+        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
+
+    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new Resource(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
+
+    private static final AclFilterResponse LITERAL_RESPONSE = new AclFilterResponse(aclDeletions(LITERAL_ACL1, LITERAL_ACL2));
+
+    private static final AclFilterResponse PREFIXED_RESPONSE = new AclFilterResponse(aclDeletions(LITERAL_ACL1, PREFIXED_ACL1));
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void shouldThrowOnV0IfNotLiteral() {
+        new DeleteAclsResponse(10, aclResponses(PREFIXED_RESPONSE)).toStruct(V0);
+    }
+
+    @Test
+    public void shouldRoundTripV0() {
+        final DeleteAclsResponse original = new DeleteAclsResponse(10, aclResponses(LITERAL_RESPONSE));
+        final Struct struct = original.toStruct(V0);
+
+        final DeleteAclsResponse result = new DeleteAclsResponse(struct);
+
+        assertResponseEquals(original, result);
+    }
+
+    @Test
+    public void shouldRoundTripV1() {
+        final DeleteAclsResponse original = new DeleteAclsResponse(100, aclResponses(LITERAL_RESPONSE, PREFIXED_RESPONSE));
+        final Struct struct = original.toStruct(V1);
+
+        final DeleteAclsResponse result = new DeleteAclsResponse(struct);
+
+        assertResponseEquals(original, result);
+    }
+
+    private static void assertResponseEquals(final DeleteAclsResponse original, final DeleteAclsResponse actual) {
+        assertEquals("Number of responses wrong", original.responses().size(), actual.responses().size());
+
+        for (int idx = 0; idx != original.responses().size(); ++idx) {
+            final List<AclBinding> originalBindings = original.responses().get(idx).deletions().stream()
+                .map(AclDeletionResult::acl)
+                .collect(Collectors.toList());
+
+            final List<AclBinding> actualBindings = actual.responses().get(idx).deletions().stream()
+                .map(AclDeletionResult::acl)
+                .collect(Collectors.toList());
+
+            assertEquals(originalBindings, actualBindings);
+        }
+    }
+
+    private static List<AclFilterResponse> aclResponses(final AclFilterResponse... responses) {
+        return Arrays.asList(responses);
+    }
+
+    private static Collection<AclDeletionResult> aclDeletions(final AclBinding... acls) {
+        return Arrays.stream(acls)
+            .map(AclDeletionResult::new)
+            .collect(Collectors.toList());
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
new file mode 100644
index 0000000..543cf37
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DescribeAclsRequestTest {
+    private static final short V0 = 0;
+    private static final short V1 = 1;
+
+    private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+        new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
+
+    private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+        new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
+
+    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+        new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void shouldThrowOnV0IfNotLiteral() {
+        new DescribeAclsRequest(PREFIXED_FILTER, V0);
+    }
+
+    @Test
+    public void shouldRoundTripV0() {
+        final DescribeAclsRequest original = new DescribeAclsRequest(LITERAL_FILTER, V0);
+        final Struct struct = original.toStruct();
+
+        final DescribeAclsRequest result = new DescribeAclsRequest(struct, V0);
+
+        assertRequestEquals(original, result);
+    }
+
+    @Test
+    public void shouldRoundTripLiteralV1() {
+        final DescribeAclsRequest original = new DescribeAclsRequest(LITERAL_FILTER, V1);
+        final Struct struct = original.toStruct();
+
+        final DescribeAclsRequest result = new DescribeAclsRequest(struct, V1);
+
+        assertRequestEquals(original, result);
+    }
+
+    @Test
+    public void shouldRoundTripPrefixedV1() {
+        final DescribeAclsRequest original = new DescribeAclsRequest(PREFIXED_FILTER, V1);
+        final Struct struct = original.toStruct();
+
+        final DescribeAclsRequest result = new DescribeAclsRequest(struct, V1);
+
+        assertRequestEquals(original, result);
+    }
+
+    @Test
+    public void shouldRoundTripAnyV1() {
+        final DescribeAclsRequest original = new DescribeAclsRequest(ANY_FILTER, V1);
+        final Struct struct = original.toStruct();
+
+        final DescribeAclsRequest result = new DescribeAclsRequest(struct, V1);
+
+        assertRequestEquals(original, result);
+    }
+
+    private static void assertRequestEquals(final DescribeAclsRequest original, final DescribeAclsRequest actual) {
+        final AclBindingFilter originalFilter = original.filter();
+        final AclBindingFilter acttualFilter = actual.filter();
+        assertEquals(originalFilter, acttualFilter);
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
new file mode 100644
index 0000000..81cf518
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class DescribeAclsResponseTest {
+    private static final short V0 = 0;
+    private static final short V1 = 1;
+
+    private static final AclBinding LITERAL_ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
+
+    private static final AclBinding LITERAL_ACL2 = new AclBinding(new Resource(ResourceType.GROUP, "group", ResourceNameType.LITERAL),
+        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
+
+    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new Resource(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void shouldThrowOnV0IfNotLiteral() {
+        new DescribeAclsResponse(10, ApiError.NONE, aclBindings(PREFIXED_ACL1)).toStruct(V0);
+    }
+
+    @Test
+    public void shouldRoundTripV0() {
+        final DescribeAclsResponse original = new DescribeAclsResponse(10, ApiError.NONE, aclBindings(LITERAL_ACL1, LITERAL_ACL2));
+        final Struct struct = original.toStruct(V0);
+
+        final DescribeAclsResponse result = new DescribeAclsResponse(struct);
+
+        assertResponseEquals(original, result);
+    }
+
+    @Test
+    public void shouldRoundTripV1() {
+        final DescribeAclsResponse original = new DescribeAclsResponse(100, ApiError.NONE, aclBindings(LITERAL_ACL1, PREFIXED_ACL1));
+        final Struct struct = original.toStruct(V1);
+
+        final DescribeAclsResponse result = new DescribeAclsResponse(struct);
+
+        assertResponseEquals(original, result);
+    }
+
+    private static void assertResponseEquals(final DescribeAclsResponse original, final DescribeAclsResponse actual) {
+        final Set<AclBinding> originalBindings = new HashSet<>(original.acls());
+        final Set<AclBinding> actualBindings = new HashSet<>(actual.acls());
+
+        assertEquals(originalBindings, actualBindings);
+    }
+
+    private static List<AclBinding> aclBindings(final AclBinding... bindings) {
+        return Arrays.asList(bindings);
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ec6c5d5..e0c72a2 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -48,8 +47,10 @@ import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.SecurityUtils;
@@ -72,7 +73,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
@@ -1087,23 +1087,23 @@ public class RequestResponseTest {
 
     private DescribeAclsRequest createListAclsRequest() {
         return new DescribeAclsRequest.Builder(new AclBindingFilter(
-                new ResourceFilter(ResourceType.TOPIC, "mytopic"),
+                new ResourceFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
                 new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).build();
     }
 
     private DescribeAclsResponse createDescribeAclsResponse() {
         return new DescribeAclsResponse(0, ApiError.NONE, Collections.singleton(new AclBinding(
-            new Resource(ResourceType.TOPIC, "mytopic"),
+            new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))));
     }
 
     private CreateAclsRequest createCreateAclsRequest() {
         List<AclCreation> creations = new ArrayList<>();
         creations.add(new AclCreation(new AclBinding(
-            new Resource(ResourceType.TOPIC, "mytopic"),
+            new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.ALLOW))));
         creations.add(new AclCreation(new AclBinding(
-            new Resource(ResourceType.GROUP, "mygroup"),
+            new Resource(ResourceType.GROUP, "mygroup", ResourceNameType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))));
         return new CreateAclsRequest.Builder(creations).build();
     }
@@ -1116,10 +1116,10 @@ public class RequestResponseTest {
     private DeleteAclsRequest createDeleteAclsRequest() {
         List<AclBindingFilter> filters = new ArrayList<>();
         filters.add(new AclBindingFilter(
-            new ResourceFilter(ResourceType.ANY, null),
+            new ResourceFilter(ResourceType.ANY, null, ResourceNameType.LITERAL),
             new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)));
         filters.add(new AclBindingFilter(
-            new ResourceFilter(ResourceType.ANY, null),
+            new ResourceFilter(ResourceType.ANY, null, ResourceNameType.LITERAL),
             new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY)));
         return new DeleteAclsRequest.Builder(filters).build();
     }
@@ -1128,10 +1128,10 @@ public class RequestResponseTest {
         List<AclFilterResponse> responses = new ArrayList<>();
         responses.add(new AclFilterResponse(Utils.mkSet(
                 new AclDeletionResult(new AclBinding(
-                        new Resource(ResourceType.TOPIC, "mytopic3"),
+                        new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
                         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))),
                 new AclDeletionResult(new AclBinding(
-                        new Resource(ResourceType.TOPIC, "mytopic4"),
+                        new Resource(ResourceType.TOPIC, "mytopic4", ResourceNameType.LITERAL),
                         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY))))));
         responses.add(new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"),
             Collections.<AclDeletionResult>emptySet()));
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceFilterTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceFilterTest.java
new file mode 100644
index 0000000..9b2d6d4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceFilterTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.resource;
+
+import org.junit.Test;
+
+import static org.apache.kafka.common.resource.ResourceNameType.LITERAL;
+import static org.apache.kafka.common.resource.ResourceNameType.PREFIXED;
+import static org.apache.kafka.common.resource.ResourceType.ANY;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.UNKNOWN;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ResourceFilterTest {
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfResourceTypeIsAny() {
+        new ResourceFilter(ANY, null, ResourceNameType.ANY)
+            .matches(new Resource(ANY, "Name", PREFIXED));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfResourceTypeIsUnknown() {
+        new ResourceFilter(ANY, null, ResourceNameType.ANY)
+            .matches(new Resource(UNKNOWN, "Name", LITERAL));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfResourceNameTypeIsAny() {
+        new ResourceFilter(ANY, null, ResourceNameType.ANY)
+            .matches(new Resource(GROUP, "Name", ResourceNameType.ANY));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfAclResourceNameTypeIsUnknown() {
+        new ResourceFilter(ANY, null, ResourceNameType.ANY)
+            .matches(new Resource(GROUP, "Name", ResourceNameType.UNKNOWN));
+    }
+
+    @Test
+    public void shouldNotMatchIfDifferentResourceType() {
+        assertFalse(new ResourceFilter(TOPIC, "Name", LITERAL)
+            .matches(new Resource(GROUP, "Name", LITERAL)));
+    }
+
+    @Test
+    public void shouldNotMatchIfDifferentName() {
+        assertFalse(new ResourceFilter(TOPIC, "Different", PREFIXED)
+            .matches(new Resource(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldNotMatchIfDifferentNameCase() {
+        assertFalse(new ResourceFilter(TOPIC, "NAME", LITERAL)
+            .matches(new Resource(TOPIC, "Name", LITERAL)));
+    }
+
+    @Test
+    public void shouldNotMatchIfDifferentNameType() {
+        assertFalse(new ResourceFilter(TOPIC, "Name", LITERAL)
+            .matches(new Resource(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldMatchWhereResourceTypeIsAny() {
+        assertTrue(new ResourceFilter(ANY, "Name", PREFIXED)
+            .matches(new Resource(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldMatchWhereResourceNameIsAny() {
+        assertTrue(new ResourceFilter(TOPIC, null, PREFIXED)
+            .matches(new Resource(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldMatchWhereResourceNameTypeIsAny() {
+        assertTrue(new ResourceFilter(TOPIC, null, ResourceNameType.ANY)
+            .matches(new Resource(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldMatchLiteralIfExactMatch() {
+        assertTrue(new ResourceFilter(TOPIC, "Name", LITERAL)
+            .matches(new Resource(TOPIC, "Name", LITERAL)));
+    }
+
+    @Test
+    public void shouldMatchLiteralIfNameMatchesAndFilterIsOnAnyNameType() {
+        assertTrue(new ResourceFilter(TOPIC, "Name", ResourceNameType.ANY)
+            .matches(new Resource(TOPIC, "Name", LITERAL)));
+    }
+
+    @Test
+    public void shouldNotMatchLiteralIfNamePrefixed() {
+        assertFalse(new ResourceFilter(TOPIC, "Name-something", ResourceNameType.ANY)
+            .matches(new Resource(TOPIC, "Name", LITERAL)));
+    }
+
+    @Test
+    public void shouldMatchLiteralWildcardIfExactMatch() {
+        assertTrue(new ResourceFilter(TOPIC, "*", LITERAL)
+            .matches(new Resource(TOPIC, "*", LITERAL)));
+    }
+
+    @Test
+    public void shouldNotMatchLiteralWildcardAgainstOtherName() {
+        assertFalse(new ResourceFilter(TOPIC, "Name", LITERAL)
+            .matches(new Resource(TOPIC, "*", LITERAL)));
+    }
+
+    @Test
+    public void shouldNotMatchLiteralWildcardTheWayAround() {
+        assertFalse(new ResourceFilter(TOPIC, "*", LITERAL)
+            .matches(new Resource(TOPIC, "Name", LITERAL)));
+    }
+
+    @Test
+    public void shouldMatchLiteralWildcardIfFilterHasNameTypeOfAny() {
+        assertTrue(new ResourceFilter(TOPIC, "Name", ResourceNameType.ANY)
+            .matches(new Resource(TOPIC, "*", LITERAL)));
+    }
+
+    @Test
+    public void shouldMatchPrefixedIfExactMatch() {
+        assertTrue(new ResourceFilter(TOPIC, "Name", PREFIXED)
+            .matches(new Resource(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldNotMatchIfBothPrefixedAndFilterIsPrefixOfResource() {
+        assertFalse(new ResourceFilter(TOPIC, "Name", PREFIXED)
+            .matches(new Resource(TOPIC, "Name-something", PREFIXED)));
+    }
+
+    @Test
+    public void shouldNotMatchIfBothPrefixedAndResourceIsPrefixOfFilter() {
+        assertFalse(new ResourceFilter(TOPIC, "Name-something", PREFIXED)
+            .matches(new Resource(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldMatchPrefixedIfNamePrefixedAnyFilterTypeIsAny() {
+        assertTrue(new ResourceFilter(TOPIC, "Name-something", ResourceNameType.ANY)
+            .matches(new Resource(TOPIC, "Name", PREFIXED)));
+    }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 4409a18..25b630d 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -18,24 +18,29 @@
 package kafka.admin
 
 import joptsimple._
+import joptsimple.util.EnumConverter
 import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType => JResourceNameType, ResourceType => JResourceType, Resource => JResource}
 
 import scala.collection.JavaConverters._
 
 object AclCommand extends Logging {
 
-  val Newline = scala.util.Properties.lineSeparator
-  val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
-    Topic -> Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs, All),
-    Group -> Set(Read, Describe, Delete, All),
-    Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
-    TransactionalId -> Set(Describe, Write, All),
-    DelegationToken -> Set(Describe, All)
+  val ClusterResourceFilter = new ResourceFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, JResourceNameType.LITERAL)
+
+  private val Newline = scala.util.Properties.lineSeparator
+
+  val ResourceTypeToValidOperations: Map[JResourceType, Set[Operation]] = Map[JResourceType, Set[Operation]](
+    JResourceType.TOPIC -> Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs, All),
+    JResourceType.GROUP -> Set(Read, Describe, Delete, All),
+    JResourceType.CLUSTER -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
+    JResourceType.TRANSACTIONAL_ID -> Set(Describe, Write, All),
+    JResourceType.DELEGATION_TOKEN -> Set(Describe, All)
   )
 
   def main(args: Array[String]) {
@@ -82,8 +87,14 @@ object AclCommand extends Logging {
   }
 
   private def addAcl(opts: AclCommandOptions) {
+    if (opts.options.valueOf(opts.resourceNameType) == JResourceNameType.ANY)
+      CommandLineUtils.printUsageAndDie(opts.parser, "A '--resource-name-type' value of 'Any' is not valid when adding acls.")
+
     withAuthorizer(opts) { authorizer =>
-      val resourceToAcl = getResourceToAcls(opts)
+      val resourceToAcl = getResourceFilterToAcls(opts).map {
+        case (filter, acls) =>
+          Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), ResourceNameType.fromJava(filter.nameType())) -> acls
+      }
 
       if (resourceToAcl.values.exists(_.isEmpty))
         CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
@@ -99,15 +110,15 @@ object AclCommand extends Logging {
 
   private def removeAcl(opts: AclCommandOptions) {
     withAuthorizer(opts) { authorizer =>
-      val resourceToAcl = getResourceToAcls(opts)
+      val filterToAcl = getResourceFilterToAcls(opts)
 
-      for ((resource, acls) <- resourceToAcl) {
+      for ((filter, acls) <- filterToAcl) {
         if (acls.isEmpty) {
-          if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `$resource`? (y/n)"))
-            authorizer.removeAcls(resource)
+          if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)"))
+            removeAcls(authorizer, acls, filter)
         } else {
-          if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `$resource`? (y/n)"))
-            authorizer.removeAcls(acls, resource)
+          if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
+            removeAcls(authorizer, acls, filter)
         }
       }
 
@@ -115,42 +126,57 @@ object AclCommand extends Logging {
     }
   }
 
+  private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourceFilter) {
+    getAcls(authorizer, filter)
+      .keys
+      .foreach(resource =>
+        if (acls.isEmpty) authorizer.removeAcls(resource)
+        else authorizer.removeAcls(acls, resource)
+      )
+  }
+
   private def listAcl(opts: AclCommandOptions) {
     withAuthorizer(opts) { authorizer =>
-      val resources = getResource(opts, dieIfNoResourceFound = false)
+      val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
 
       val resourceToAcls: Iterable[(Resource, Set[Acl])] =
-        if (resources.isEmpty) authorizer.getAcls()
-        else resources.map(resource => resource -> authorizer.getAcls(resource))
+        if (filters.isEmpty) authorizer.getAcls()
+        else filters.flatMap(filter => getAcls(authorizer, filter))
 
       for ((resource, acls) <- resourceToAcls)
         println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
     }
   }
 
-  private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
-    var resourceToAcls = Map.empty[Resource, Set[Acl]]
+  private def getAcls(authorizer: Authorizer, filter: ResourceFilter): Map[Resource, Set[Acl]] =
+    authorizer.getAcls()
+      .filter { case (resource, acl) => filter.matches(resource.toJava) }
+
+  private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourceFilter, Set[Acl]] = {
+    var resourceToAcls = Map.empty[ResourceFilter, Set[Acl]]
 
     //if none of the --producer or --consumer options are specified , just construct ACLs from CLI options.
     if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) {
-      resourceToAcls ++= getCliResourceToAcls(opts)
+      resourceToAcls ++= getCliResourceFilterToAcls(opts)
     }
 
     //users are allowed to specify both --producer and --consumer options in a single command.
     if (opts.options.has(opts.producerOpt))
-      resourceToAcls ++= getProducerResourceToAcls(opts)
+      resourceToAcls ++= getProducerResourceFilterToAcls(opts)
 
     if (opts.options.has(opts.consumerOpt))
-      resourceToAcls ++= getConsumerResourceToAcls(opts).map { case (k, v) => k -> (v ++ resourceToAcls.getOrElse(k, Set.empty[Acl])) }
+      resourceToAcls ++= getConsumerResourceFilterToAcls(opts).map { case (k, v) => k -> (v ++ resourceToAcls.getOrElse(k, Set.empty[Acl])) }
 
     validateOperation(opts, resourceToAcls)
 
     resourceToAcls
   }
 
-  private def getProducerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
-    val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
-    val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId)
+  private def getProducerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourceFilter, Set[Acl]] = {
+    val filters = getResourceFilter(opts)
+
+    val topics: Set[ResourceFilter] = filters.filter(_.resourceType == JResourceType.TOPIC)
+    val transactionalIds: Set[ResourceFilter] = filters.filter(_.resourceType == JResourceType.TRANSACTIONAL_ID)
     val enableIdempotence = opts.options.has(opts.idempotentOpt)
 
     val topicAcls = getAcl(opts, Set(Write, Describe, Create))
@@ -160,29 +186,29 @@ object AclCommand extends Logging {
     topics.map(_ -> topicAcls).toMap ++
       transactionalIds.map(_ -> transactionalIdAcls).toMap ++
         (if (enableIdempotence) 
-          Map(Resource.ClusterResource -> getAcl(opts, Set(IdempotentWrite))) 
+          Map(ClusterResourceFilter -> getAcl(opts, Set(IdempotentWrite)))
         else
           Map.empty)
   }
 
-  private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
-    val resources = getResource(opts)
+  private def getConsumerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourceFilter, Set[Acl]] = {
+    val filters = getResourceFilter(opts)
 
-    val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
-    val groups: Set[Resource] = resources.filter(_.resourceType == Group)
+    val topics: Set[ResourceFilter] = filters.filter(_.resourceType == JResourceType.TOPIC)
+    val groups: Set[ResourceFilter] = filters.filter(_.resourceType == JResourceType.GROUP)
 
     //Read, Describe on topic, Read on consumerGroup
 
     val acls = getAcl(opts, Set(Read, Describe))
 
-    topics.map(_ -> acls).toMap ++
-      groups.map(_ -> getAcl(opts, Set(Read))).toMap
+    topics.map(_ -> acls).toMap[ResourceFilter, Set[Acl]] ++
+      groups.map(_ -> getAcl(opts, Set(Read))).toMap[ResourceFilter, Set[Acl]]
   }
 
-  private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
+  private def getCliResourceFilterToAcls(opts: AclCommandOptions): Map[ResourceFilter, Set[Acl]] = {
     val acls = getAcl(opts)
-    val resources = getResource(opts)
-    resources.map(_ -> acls).toMap
+    val filters = getResourceFilter(opts)
+    filters.map(_ -> acls).toMap
   }
 
   private def getAcl(opts: AclCommandOptions, operations: Set[Operation]): Set[Acl] = {
@@ -235,28 +261,30 @@ object AclCommand extends Logging {
       Set.empty[KafkaPrincipal]
   }
 
-  private def getResource(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[Resource] = {
-    var resources = Set.empty[Resource]
+  private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourceFilter] = {
+    val resourceNameType: JResourceNameType = opts.options.valueOf(opts.resourceNameType)
+
+    var resourceFilters = Set.empty[ResourceFilter]
     if (opts.options.has(opts.topicOpt))
-      opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resources += new Resource(Topic, topic.trim))
+      opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resourceFilters += new ResourceFilter(JResourceType.TOPIC, topic.trim, resourceNameType))
 
-    if (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt))
-      resources += Resource.ClusterResource
+    if (resourceNameType == JResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
+      resourceFilters += ClusterResourceFilter
 
     if (opts.options.has(opts.groupOpt))
-      opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resources += new Resource(Group, group.trim))
+      opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resourceFilters += new ResourceFilter(JResourceType.GROUP, group.trim, resourceNameType))
 
     if (opts.options.has(opts.transactionalIdOpt))
       opts.options.valuesOf(opts.transactionalIdOpt).asScala.foreach(transactionalId =>
-        resources += new Resource(TransactionalId, transactionalId))
+        resourceFilters += new ResourceFilter(JResourceType.TRANSACTIONAL_ID, transactionalId, resourceNameType))
 
     if (opts.options.has(opts.delegationTokenOpt))
-      opts.options.valuesOf(opts.delegationTokenOpt).asScala.foreach(token => resources += new Resource(DelegationToken, token.trim))
+      opts.options.valuesOf(opts.delegationTokenOpt).asScala.foreach(token => resourceFilters += new ResourceFilter(JResourceType.DELEGATION_TOKEN, token.trim, resourceNameType))
 
-    if (resources.isEmpty && dieIfNoResourceFound)
+    if (resourceFilters.isEmpty && dieIfNoResourceFound)
       CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>")
 
-    resources
+    resourceFilters
   }
 
   private def confirmAction(opts: AclCommandOptions, msg: String): Boolean = {
@@ -266,7 +294,7 @@ object AclCommand extends Logging {
     Console.readLine().equalsIgnoreCase("y")
   }
 
-  private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[Resource, Set[Acl]]) = {
+  private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourceFilter, Set[Acl]]): Unit = {
     for ((resource, acls) <- resourceToAcls) {
       val validOps = ResourceTypeToValidOperations(resource.resourceType)
       if ((acls.map(_.operation) -- validOps).nonEmpty)
@@ -317,6 +345,12 @@ object AclCommand extends Logging {
       .describedAs("delegation-token")
       .ofType(classOf[String])
 
+    val resourceNameType = parser.accepts("resource-name-type", "The type of the resource name, or any.")
+      .withRequiredArg()
+      .ofType(classOf[String])
+      .withValuesConvertedBy(new ResourceNameTypeConverter())
+      .defaultsTo(JResourceNameType.LITERAL)
+
     val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
     val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
     val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
@@ -394,3 +428,18 @@ object AclCommand extends Logging {
   }
 
 }
+
+class ResourceNameTypeConverter extends EnumConverter[JResourceNameType](classOf[JResourceNameType]) {
+
+  override def convert(value: String): JResourceNameType = {
+    val nameType = super.convert(value)
+    if (nameType.isUnknown)
+      throw new ValueConversionException("Unknown resourceNameType: " + value)
+
+    nameType
+  }
+
+  override def valuePattern: String = JResourceNameType.values
+    .filter(_ != JResourceNameType.UNKNOWN)
+    .mkString("|")
+}
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 573a16b..8442ba0 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -17,7 +17,7 @@
 
 package kafka.security
 
-import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType}
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceNameType, ResourceType}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ApiError
@@ -32,10 +32,11 @@ object SecurityUtils {
   def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
     (for {
       resourceType <- Try(ResourceType.fromJava(filter.resourceFilter.resourceType))
+      resourceNameType <- Try(ResourceNameType.fromJava(filter.resourceFilter.nameType))
       principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
-      resource = Resource(resourceType, filter.resourceFilter.name)
+      resource = Resource(resourceType, filter.resourceFilter.name, resourceNameType)
       acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
     } yield (resource, acl)) match {
       case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
@@ -44,7 +45,7 @@ object SecurityUtils {
   }
 
   def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
-    val adminResource = new AdminResource(resource.resourceType.toJava, resource.name)
+    val adminResource = new AdminResource(resource.resourceType.toJava, resource.name, resource.resourceNameType.toJava)
     val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
       acl.operation.toJava, acl.permissionType.toJava)
     new AclBinding(adminResource, entry)
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index 67f3d95..7fa1638 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -20,11 +20,13 @@ package kafka.security.auth
 import kafka.utils.Json
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.SecurityUtils
+
 import scala.collection.JavaConverters._
 
 object Acl {
   val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
   val WildCardHost: String = "*"
+  val WildCardResource: String = org.apache.kafka.common.resource.Resource.WILDCARD_RESOURCE
   val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
   val PrincipalKey = "principal"
   val PermissionTypeKey = "permissionType"
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 6f4ca0e..4f4ddcf 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -37,7 +37,7 @@ trait Authorizer extends Configurable {
   /**
    * @param session The session being authenticated.
    * @param operation Type of operation client is trying to perform on resource.
-   * @param resource Resource the client is trying to access.
+   * @param resource Resource the client is trying to access. Resource name type is always literal in input resource.
    * @return true if the operation should be permitted, false otherwise
    */
   def authorize(session: Session, operation: Operation, resource: Resource): Boolean
@@ -45,42 +45,90 @@ trait Authorizer extends Configurable {
   /**
    * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new
    * acls will be added to existing acls.
+   *
+   * {code}
+   * // The following will add ACLs to the literal resource path 'foo', which will only affect the topic named 'foo':
+   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", Literal))
+   *
+   * // The following will add ACLs to the special literal topic resource path '*', which affects all topics:
+   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", Literal))
+   *
+   * // The following will add ACLs to the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
+   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", Prefixed))
+   * {code}
+   *
    * @param acls set of acls to add to existing acls
-   * @param resource the resource to which these acls should be attached.
+   * @param resource the resource path to which these acls should be attached
    */
   def addAcls(acls: Set[Acl], resource: Resource): Unit
 
   /**
    * remove these acls from the resource.
+   *
+   * {code}
+   * // The following will remove ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
+   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", Literal))
+   *
+   * // The following will remove ACLs from the special literal topic resource path '*', which affects all topics:
+   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", Literal))
+   *
+   * // The following will remove ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
+   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", Prefixed))
+   * {code}
+   *
    * @param acls set of acls to be removed.
-   * @param resource resource from which the acls should be removed.
+   * @param resource resource path from which the acls should be removed.
    * @return true if some acl got removed, false if no acl was removed.
    */
   def removeAcls(acls: Set[Acl], resource: Resource): Boolean
 
   /**
    * remove a resource along with all of its acls from acl store.
-   * @param resource
+   *
+   * {code}
+   * // The following will remove all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
+   * authorizer.removeAcls(Resource(Topic, "foo", Literal))
+   *
+   * // The following will remove all ACLs from the special literal topic resource path '*', which affects all topics:
+   * authorizer.removeAcls(Resource(Topic, "*", Literal))
+   *
+   * // The following will remove all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
+   * authorizer.removeAcls(Resource(Topic, "foo", Prefixed))
+   * {code}
+   *
+   * @param resource the resource path from which these acls should be removed.
    * @return
    */
   def removeAcls(resource: Resource): Boolean
 
   /**
-   * get set of acls for this resource
-   * @param resource
+   * get set of acls for the supplied resource
+   *
+   * {code}
+   * // The following will get all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
+   * authorizer.removeAcls(Resource(Topic, "foo", Literal))
+   *
+   * // The following will get all ACLs from the special literal topic resource path '*', which affects all topics:
+   * authorizer.removeAcls(Resource(Topic, "*", Literal))
+   *
+   * // The following will get all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
+   * authorizer.removeAcls(Resource(Topic, "foo", Prefixed))
+   * {code}
+   *
+   * @param resource the resource path to which the acls belong.
    * @return empty set if no acls are found, otherwise the acls for the resource.
    */
   def getAcls(resource: Resource): Set[Acl]
 
   /**
    * get the acls for this principal.
-   * @param principal
+   * @param principal principal name.
    * @return empty Map if no acls exist for this principal, otherwise a map of resource -> acls for the principal.
    */
   def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
 
   /**
-   * gets the map of resource to acls for all resources.
+   * gets the map of resource paths to acls for all resources.
    */
   def getAcls(): Map[Resource, Set[Acl]]
 
@@ -90,4 +138,3 @@ trait Authorizer extends Configurable {
   def close(): Unit
 
 }
-
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index 311f5b5..fa63fcc 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -16,31 +16,44 @@
  */
 package kafka.security.auth
 
+import java.util.Objects
+import org.apache.kafka.common.resource.{Resource => JResource}
+
 object Resource {
-  val Separator = ":"
   val ClusterResourceName = "kafka-cluster"
-  val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName)
+  val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName, Literal)
   val ProducerIdResourceName = "producer-id"
   val WildCardResource = "*"
-
-  def fromString(str: String): Resource = {
-    str.split(Separator, 2) match {
-      case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name)
-      case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
-    }
-  }
 }
 
 /**
  *
- * @param resourceType type of resource.
- * @param name name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
+ * @param resourceType non-null type of resource.
+ * @param name non-null name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
  *             it will be a constant string kafka-cluster.
+ * @param resourceNameType non-null type of resource name: literal, prefixed, etc.
  */
-case class Resource(resourceType: ResourceType, name: String) {
+case class Resource(resourceType: ResourceType, name: String, resourceNameType: ResourceNameType) {
+
+  Objects.requireNonNull(resourceType, "resourceType")
+  Objects.requireNonNull(name, "name")
+  Objects.requireNonNull(resourceNameType, "resourceNameType")
+
+  /**
+    * Create an instance of this class with the provided parameters.
+    * Resource name type would default to ResourceNameType.LITERAL.
+    *
+    * @param resourceType non-null resource type
+    * @param name         non-null resource name
+    * @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, ResourceNameType)]]
+    */
+  @deprecated("Use Resource(ResourceType, String, ResourceNameType")
+  def this(resourceType: ResourceType, name: String) {
+    this(resourceType, name, Literal)
+  }
 
-  override def toString: String = {
-    resourceType.name + Resource.Separator + name
+  def toJava: JResource = {
+    new JResource(resourceType.toJava, name, resourceNameType.toJava)
   }
 }
 
diff --git a/core/src/main/scala/kafka/security/auth/ResourceNameType.scala b/core/src/main/scala/kafka/security/auth/ResourceNameType.scala
new file mode 100644
index 0000000..21b10a1
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/ResourceNameType.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.auth
+
+import kafka.common.{BaseEnum, KafkaException}
+import org.apache.kafka.common.resource.{ResourceNameType => JResourceNameType}
+
+sealed trait ResourceNameType extends BaseEnum  with Ordered[ ResourceNameType ] {
+  def toJava: JResourceNameType
+
+  override def compare(that: ResourceNameType): Int = this.name compare that.name
+}
+
+case object Literal extends ResourceNameType {
+  val name = "Literal"
+  val toJava = JResourceNameType.LITERAL
+}
+
+case object Prefixed extends ResourceNameType {
+  val name = "Prefixed"
+  val toJava = JResourceNameType.PREFIXED
+}
+
+object ResourceNameType {
+
+  def fromString(resourceNameType: String): ResourceNameType = {
+    val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceNameType))
+    rType.getOrElse(throw new KafkaException(resourceNameType + " not a valid resourceNameType name. The valid names are " + values.mkString(",")))
+  }
+
+  def values: Seq[ResourceNameType] = List(Literal, Prefixed)
+
+  def fromJava(nameType: JResourceNameType): ResourceNameType = fromString(nameType.toString)
+}
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index 4ba5bcb..65a0373 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -20,9 +20,11 @@ import kafka.common.{BaseEnum, KafkaException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.resource.{ResourceType => JResourceType}
 
-sealed trait ResourceType extends BaseEnum {
+sealed trait ResourceType extends BaseEnum with Ordered[ ResourceType ] {
   def error: Errors
   def toJava: JResourceType
+
+  override def compare(that: ResourceType): Int = this.name compare that.name
 }
 
 case object Topic extends ResourceType {
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index c439f5e..c828970 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -16,7 +16,6 @@
  */
 package kafka.security.auth
 
-import java.nio.charset.StandardCharsets
 import java.util
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -27,7 +26,8 @@ import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
-import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, KafkaZkClient}
+import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore}
+import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType => JResourceNameType}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 
@@ -54,10 +54,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   private val authorizerLogger = Logger("kafka.authorizer.logger")
   private var superUsers = Set.empty[KafkaPrincipal]
   private var shouldAllowEveryoneIfNoAclIsFound = false
-  private var zkClient: KafkaZkClient = null
-  private var aclChangeListener: ZkNodeChangeNotificationListener = null
+  private var zkClient: KafkaZkClient = _
+  private var aclChangeListeners: Seq[ZkNodeChangeNotificationListener] = List()
 
-  private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls]
+  @volatile
+  private var aclCache = new scala.collection.immutable.TreeMap[Resource, VersionedAcls]()(ResourceOrdering)
   private val lock = new ReentrantReadWriteLock()
 
   // The maximum number of times we should try to update the resource acls in zookeeper before failing;
@@ -97,14 +98,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
     loadCache()
 
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler)
-    aclChangeListener.init()
+    startZkChangeListeners()
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
+    if (resource.resourceNameType != Literal) {
+      throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.resourceNameType)
+    }
+
     val principal = session.principal
     val host = session.clientAddress.getHostAddress
-    val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
+    val acls = getMatchingAcls(resource.resourceType, resource.name)
 
     // Check if there is any Deny acl match that would disallow this operation.
     val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)
@@ -143,14 +147,14 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     } else false
   }
 
-  private def aclMatch(operations: Operation, resource: Resource, principal: KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = {
+  private def aclMatch(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = {
     acls.find { acl =>
       acl.permissionType == permissionType &&
         (acl.principal == principal || acl.principal == Acl.WildCardPrincipal) &&
-        (operations == acl.operation || acl.operation == All) &&
+        (operation == acl.operation || acl.operation == All) &&
         (acl.host == host || acl.host == Acl.WildCardHost)
     }.exists { acl =>
-      authorizerLogger.debug(s"operation = $operations on resource = $resource from host = $host is $permissionType based on acl = $acl")
+      authorizerLogger.debug(s"operation = $operation on resource = $resource from host = $host is $permissionType based on acl = $acl")
       true
     }
   }
@@ -194,7 +198,28 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
         versionedAcls.acls.filter(_.principal == principal)
       }.filter { case (_, acls) =>
         acls.nonEmpty
-      }.toMap
+      }
+    }
+  }
+
+  def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = {
+    val filter = new ResourceFilter(resourceType.toJava, resourceName, JResourceNameType.ANY)
+
+    inReadLock(lock) {
+      val wildcard = aclCache.get(Resource(resourceType, Acl.WildCardResource, Literal))
+        .map(_.acls)
+        .getOrElse(Set.empty[Acl])
+
+      val literal = aclCache.get(Resource(resourceType, resourceName, Literal))
+        .map(_.acls)
+        .getOrElse(Set.empty[Acl])
+
+      val prefixed = aclCache.range(Resource(resourceType, resourceName, Prefixed), Resource(resourceType, resourceName.substring(0, 1), Prefixed))
+        .filterKeys(resource => resourceName.startsWith(resource.name))
+        .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
+        .toSet
+
+      prefixed ++ wildcard ++ literal
     }
   }
 
@@ -205,24 +230,36 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   def close() {
-    if (aclChangeListener != null) aclChangeListener.close()
+    aclChangeListeners.foreach(listener => listener.close())
     if (zkClient != null) zkClient.close()
   }
 
-  private def loadCache()  {
+  private def loadCache() {
     inWriteLock(lock) {
-      val resourceTypes = zkClient.getResourceTypes()
-      for (rType <- resourceTypes) {
-        val resourceType = ResourceType.fromString(rType)
-        val resourceNames = zkClient.getResourceNames(resourceType.name)
-        for (resourceName <- resourceNames) {
-          val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName))
-          updateCache(new Resource(resourceType, resourceName), versionedAcls)
+      ZkAclStore.stores.foreach(store => {
+        val resourceTypes = zkClient.getResourceTypes(store.nameType)
+        for (rType <- resourceTypes) {
+          val resourceType = ResourceType.fromString(rType)
+          val resourceNames = zkClient.getResourceNames(store.nameType, resourceType)
+          for (resourceName <- resourceNames) {
+            val versionedAcls = getAclsFromZk(new Resource(resourceType, resourceName, store.nameType))
+            updateCache(new Resource(resourceType, resourceName, store.nameType), versionedAcls)
+          }
         }
-      }
+      })
     }
   }
 
+  private def startZkChangeListeners(): Unit = {
+    aclChangeListeners = ZkAclStore.stores.map(store => {
+      val aclChangeListener = new ZkNodeChangeNotificationListener(
+        zkClient, store.aclChangePath, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, new AclChangedNotificationHandler(store))
+
+      aclChangeListener.init()
+      aclChangeListener
+    })
+  }
+
   private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) {
     def logMessage: String = {
       val authResult = if (authorized) "Allowed" else "Denied"
@@ -298,23 +335,24 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {
     if (versionedAcls.acls.nonEmpty) {
-      aclCache.put(resource, versionedAcls)
+      aclCache = aclCache + (resource -> versionedAcls)
     } else {
-      aclCache.remove(resource)
+      aclCache = aclCache - resource
     }
   }
 
   private def updateAclChangedFlag(resource: Resource) {
-    zkClient.createAclChangeNotification(resource.toString)
+    zkClient.createAclChangeNotification(resource)
   }
 
   private def backoffTime = {
     retryBackoffMs + Random.nextInt(retryBackoffJitterMs)
   }
 
-  object AclChangedNotificationHandler extends NotificationHandler {
+  class AclChangedNotificationHandler(store: ZkAclStore) extends NotificationHandler {
     override def processNotification(notificationMessage: Array[Byte]) {
-      val resource: Resource = Resource.fromString(new String(notificationMessage, StandardCharsets.UTF_8))
+      val resource: Resource = store.decode(notificationMessage)
+
       inWriteLock(lock) {
         val versionedAcls = getAclsFromZk(resource)
         updateCache(resource, versionedAcls)
@@ -322,4 +360,20 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
+  // Orders by resource type, then resource name type and finally reverse ordering by name.
+  private object ResourceOrdering extends Ordering[Resource] {
+
+    def compare(a: Resource, b: Resource): Int = {
+      val rt = a.resourceType compare b.resourceType
+      if (rt != 0)
+        rt
+      else {
+        val rnt = a.resourceNameType compare b.resourceNameType
+        if (rnt != 0)
+          rnt
+        else
+          (a.name compare b.name) * -1
+      }
+    }
+  }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 98672c8..f4b8689 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -273,7 +273,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
     // reject the request if not authorized to the group
-    if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
+    if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId, Literal))) {
       val error = Errors.GROUP_AUTHORIZATION_FAILED
       val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
         (topicPartition, error)
@@ -286,7 +286,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequest.PartitionData]
 
       for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) {
-        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
           unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
         else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -384,7 +384,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
 
     if (produceRequest.isTransactional) {
-      if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId))) {
+      if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId, Literal))) {
         sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
@@ -400,7 +400,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
 
     for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
-      if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
+      if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic, Literal)))
         unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
       else if (!metadataCache.contains(topicPartition))
         nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -529,7 +529,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       // Regular Kafka consumers need READ permission on each partition they are fetching.
       fetchContext.foreachPartition((topicPartition, data) => {
-        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
           erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
             FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
             FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
@@ -741,7 +741,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetRequest = request.body[ListOffsetRequest]
 
     val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
+      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic, Literal))
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
@@ -794,7 +794,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetRequest = request.body[ListOffsetRequest]
 
     val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
+      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic, Literal))
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
@@ -1033,7 +1033,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
     var (authorizedTopics, unauthorizedForDescribeTopics) =
-      topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
+      topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic, Literal)))
 
     var unauthorizedForCreateTopics = Set[String]()
 
@@ -1097,12 +1097,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetFetchRequest = request.body[OffsetFetchRequest]
 
     def authorizeTopicDescribe(partition: TopicPartition) =
-      authorize(request.session, Describe, new Resource(Topic, partition.topic))
+      authorize(request.session, Describe, new Resource(Topic, partition.topic, Literal))
 
     def createResponse(requestThrottleMs: Int): AbstractResponse = {
       val offsetFetchResponse =
         // reject the request if not authorized to the group
-        if (!authorize(request.session, Describe, new Resource(Group, offsetFetchRequest.groupId)))
+        if (!authorize(request.session, Describe, new Resource(Group, offsetFetchRequest.groupId, Literal)))
           offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
         else {
           if (header.apiVersion == 0) {
@@ -1170,10 +1170,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val findCoordinatorRequest = request.body[FindCoordinatorRequest]
 
     if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
-        !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey)))
+        !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey, Literal)))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION &&
-        !authorize(request.session, Describe, new Resource(TransactionalId, findCoordinatorRequest.coordinatorKey)))
+        !authorize(request.session, Describe, new Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, Literal)))
       sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
       // get metadata (and create the topic if necessary)
@@ -1220,7 +1220,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val describeRequest = request.body[DescribeGroupsRequest]
 
     val groups = describeRequest.groupIds.asScala.map { groupId =>
-      if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
+      if (!authorize(request.session, Describe, new Resource(Group, groupId, Literal))) {
         groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
       } else {
         val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@@ -1266,7 +1266,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
+    if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId(), Literal))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new JoinGroupResponse(
           requestThrottleMs,
@@ -1302,7 +1302,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new SyncGroupResponse(requestThrottleMs, error, ByteBuffer.wrap(memberState)))
     }
 
-    if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
+    if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId(), Literal))) {
       sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
     } else {
       groupCoordinator.handleSyncGroup(
@@ -1320,7 +1320,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     var groups = deleteGroupsRequest.groups.asScala.toSet
 
     val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
-      authorize(request.session, Delete, new Resource(Group, group))
+      authorize(request.session, Delete, new Resource(Group, group, Literal))
     }
 
     val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
@@ -1344,7 +1344,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
+    if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId, Literal))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     } else {
@@ -1371,7 +1371,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
+    if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId, Literal))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new LeaveGroupResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     } else {
@@ -1491,7 +1491,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val dupes = createPartitionsRequest.duplicates.asScala
       val notDuped = createPartitionsRequest.newPartitions.asScala -- dupes
       val (authorized, unauthorized) = notDuped.partition { case (topic, _) =>
-        authorize(request.session, Alter, new Resource(Topic, topic))
+        authorize(request.session, Alter, new Resource(Topic, topic, Literal))
       }
 
       val (queuedForDeletion, valid) = authorized.partition { case (topic, _) =>
@@ -1515,7 +1515,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val authorizedForDeleteTopics =  mutable.Set[String]()
 
     for (topic <- deleteTopicRequest.topics.asScala) {
-      if (!authorize(request.session, Delete, new Resource(Topic, topic)))
+      if (!authorize(request.session, Delete, new Resource(Topic, topic, Literal)))
         unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED
       else if (!metadataCache.contains(topic))
         nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1560,7 +1560,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
 
     for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) {
-      if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic)))
+      if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic, Literal)))
         unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
           DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)
       else if (!metadataCache.contains(topicPartition))
@@ -1603,7 +1603,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val transactionalId = initProducerIdRequest.transactionalId
 
     if (transactionalId != null) {
-      if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) {
+      if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal))) {
         sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
@@ -1628,7 +1628,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val endTxnRequest = request.body[EndTxnRequest]
     val transactionalId = endTxnRequest.transactionalId
 
-    if (authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) {
+    if (authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal))) {
       def sendResponseCallback(error: Errors) {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
           val responseBody = new EndTxnResponse(requestThrottleMs, error)
@@ -1763,7 +1763,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
     val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId)))
+    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
     else {
@@ -1773,7 +1773,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       for (topicPartition <- partitionsToAdd) {
         if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) ||
-            !authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
+            !authorize(request.session, Write, new Resource(Topic, topicPartition.topic, Literal)))
           unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
         else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1817,10 +1817,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupId = addOffsetsToTxnRequest.consumerGroupId
     val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
-    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId)))
+    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
-    else if (!authorize(request.session, Read, new Resource(Group, groupId)))
+    else if (!authorize(request.session, Read, new Resource(Group, groupId, Literal)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     else {
@@ -1849,9 +1849,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization
     // since it is implied by transactionalId authorization
-    if (!authorize(request.session, Write, new Resource(TransactionalId, txnOffsetCommitRequest.transactionalId)))
+    if (!authorize(request.session, Write, new Resource(TransactionalId, txnOffsetCommitRequest.transactionalId, Literal)))
       sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
-    else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId)))
+    else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId, Literal)))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else {
       val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
@@ -1859,7 +1859,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
 
       for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
-        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
           unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
         else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1920,7 +1920,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val filter = describeAclsRequest.filter()
         val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) =>
           acls.flatMap { acl =>
-            val fixture = new AclBinding(new AdminResource(resource.resourceType.toJava, resource.name),
+            val fixture = new AclBinding(new AdminResource(resource.resourceType.toJava, resource.name, resource.resourceNameType.toJava),
                 new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
             if (filter.matches(fixture)) Some(fixture)
             else None
@@ -1994,7 +1994,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val filtersWithIndex = filters.zipWithIndex
           for ((resource, acls) <- aclMap; acl <- acls) {
             val binding = new AclBinding(
-              new AdminResource(resource.resourceType.toJava, resource.name),
+              new AdminResource(resource.resourceType.toJava, resource.name, resource.resourceNameType.toJava),
               new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
                 acl.permissionType.toJava))
 
@@ -2042,7 +2042,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RResourceType.BROKER =>
           authorize(request.session, AlterConfigs, Resource.ClusterResource)
         case RResourceType.TOPIC =>
-          authorize(request.session, AlterConfigs, new Resource(Topic, resource.name))
+          authorize(request.session, AlterConfigs, new Resource(Topic, resource.name, Literal))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
@@ -2069,7 +2069,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       resource.`type` match {
         case RResourceType.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
         case RResourceType.TOPIC =>
-          authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name))
+          authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name, Literal))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
       }
     }
@@ -2216,7 +2216,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       else {
         val owners = if (describeTokenRequest.owners == null) None else Some(describeTokenRequest.owners.asScala.toList)
-        def authorizeToken(tokenId: String) = authorize(request.session, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId))
+        def authorizeToken(tokenId: String) = authorize(request.session, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId, Literal))
         def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken)
         val tokens =  tokenManager.getTokens(eligible)
         sendResponseCallback(Errors.NONE, tokens)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 0cf158e..90f71a1 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -26,7 +26,7 @@ import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.security.auth.{Acl, Resource, ResourceType}
+import kafka.security.auth.{Acl, Resource, ResourceNameType, ResourceType}
 import kafka.server.ConfigType
 import kafka.utils.Logging
 import kafka.zookeeper._
@@ -943,9 +943,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
    * Creates the required zk nodes for Acl storage
    */
   def createAclPaths(): Unit = {
-    createRecursive(AclZNode.path, throwIfPathExists = false)
-    createRecursive(AclChangeNotificationZNode.path, throwIfPathExists = false)
-    ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name), throwIfPathExists = false))
+    ZkAclStore.stores.foreach(store => {
+      createRecursive(store.aclPath, throwIfPathExists = false)
+      createRecursive(store.aclChangePath, throwIfPathExists = false)
+      ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false))
+    })
   }
 
   /**
@@ -1003,11 +1005,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   /**
    * Creates Acl change notification message
-   * @param resourceName resource name
+   * @param resource resource name
    */
-  def createAclChangeNotification(resourceName: String): Unit = {
-    val path = AclChangeNotificationSequenceZNode.createPath
-    val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resourceName), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+  def createAclChangeNotification(resource: Resource): Unit = {
+    val store = ZkAclStore(resource.resourceNameType)
+    val path = store.changeSequenceZNode.createPath
+    val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
   }
@@ -1030,21 +1033,24 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
    * @throws KeeperException if there is an error while deleting Acl change notifications
    */
   def deleteAclChangeNotifications(): Unit = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(AclChangeNotificationZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      deleteAclChangeNotifications(getChildrenResponse.children)
-    } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      getChildrenResponse.maybeThrow
-    }
+    ZkAclStore.stores.foreach(store => {
+      val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath))
+      if (getChildrenResponse.resultCode == Code.OK) {
+        deleteAclChangeNotifications(store, getChildrenResponse.children)
+      } else if (getChildrenResponse.resultCode != Code.NONODE) {
+        getChildrenResponse.maybeThrow
+      }
+    })
   }
 
   /**
    * Deletes the Acl change notifications associated with the given sequence nodes
    * @param sequenceNodes
    */
-  private def deleteAclChangeNotifications(sequenceNodes: Seq[String]): Unit = {
+  private def deleteAclChangeNotifications(store: ZkAclStore, sequenceNodes: Seq[String]): Unit = {
+    val aclChangeNotificationSequenceZNode = store.changeSequenceZNode
     val deleteRequests = sequenceNodes.map { sequenceNode =>
-      DeleteRequest(AclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion)
+      DeleteRequest(aclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion)
     }
 
     val deleteResponses = retryRequestsUntilConnected(deleteRequests)
@@ -1056,20 +1062,22 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   }
 
   /**
-   * Gets the resource types
+   * Gets the resource types, for which ACLs are stored, for the supplied resource name type.
+   * @param nameType The resource name type to retrieve the names for.
    * @return list of resource type names
    */
-  def getResourceTypes(): Seq[String] = {
-    getChildren(AclZNode.path)
+  def getResourceTypes(nameType: ResourceNameType): Seq[String] = {
+    getChildren(ZkAclStore(nameType).aclPath)
   }
 
   /**
-   * Gets the resource names for a give resource type
-   * @param resourceType
+   * Gets the resource names, for which ACLs are stored, for a given resource type and name type
+   * @param nameType The resource name type to retrieve the names for.
+   * @param resourceType Resource type to retrieve the names for.
    * @return list of resource names
    */
-  def getResourceNames(resourceType: String): Seq[String] = {
-    getChildren(ResourceTypeZNode.path(resourceType))
+  def getResourceNames(nameType: ResourceNameType, resourceType: ResourceType): Seq[String] = {
+    getChildren(ZkAclStore(nameType).path(resourceType))
   }
 
   /**
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 64aed56..0524b45 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -26,7 +26,7 @@ import kafka.cluster.{Broker, EndPoint}
 import kafka.common.KafkaException
 import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.security.auth.{Acl, Resource}
+import kafka.security.auth.{Acl, Literal, Prefixed, Resource, ResourceNameType, ResourceType}
 import kafka.server.{ConfigType, DelegationTokenManager}
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
@@ -445,42 +445,74 @@ object StateChangeHandlers {
 }
 
 /**
- * The root acl storage node. Under this node there will be one child node per resource type (Topic, Cluster, Group).
- * under each resourceType there will be a unique child for each resource instance and the data for that child will contain
- * list of its acls as a json object. Following gives an example:
- *
- * <pre>
- * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
- * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
- * /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
- * </pre>
- */
-object AclZNode {
-  def path = "/kafka-acl"
+  * Acls for resources are stored in ZK under a root node that is determined by the [[ResourceNameType]].
+  * Under each [[ResourceNameType]] node there will be one child node per resource type (Topic, Cluster, Group, etc).
+  * Under each resourceType there will be a unique child for each resource path and the data for that child will contain
+  * list of its acls as a json object. Following gives an example:
+  *
+  * <pre>
+  * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+  * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+  * /kafka-prefixed-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+  * </pre>
+  */
+case class ZkAclStore(nameType: ResourceNameType) {
+  val aclPath: String = nameType match {
+    case Literal => "/kafka-acl"
+    case Prefixed => "/kafka-prefixed-acl"
+    case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
+  }
+
+  val aclChangePath: String = nameType match {
+    case Literal => "/kafka-acl-changes"
+    case Prefixed => "/kafka-prefixed-acl-changes"
+    case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
+  }
+
+  def path(resourceType: ResourceType) = s"$aclPath/$resourceType"
+
+  def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName"
+
+  def changeSequenceZNode: AclChangeNotificationSequenceZNode = AclChangeNotificationSequenceZNode(this)
+
+  def decode(notificationMessage: Array[Byte]): Resource = AclChangeNotificationSequenceZNode.decode(nameType, notificationMessage)
 }
 
-object ResourceTypeZNode {
-  def path(resourceType: String) = s"${AclZNode.path}/$resourceType"
+object ZkAclStore {
+  val stores: Seq[ZkAclStore] = ResourceNameType.values
+    .map(nameType => ZkAclStore(nameType))
+
+  val securePaths: Seq[String] = stores
+    .flatMap(store => List(store.aclPath, store.aclChangePath))
 }
 
 object ResourceZNode {
-  def path(resource: Resource) = s"${AclZNode.path}/${resource.resourceType}/${resource.name}"
-  def encode(acls: Set[Acl]): Array[Byte] = {
-    Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
-  }
+  def path(resource: Resource): String = ZkAclStore(resource.resourceNameType).path(resource.resourceType, resource.name)
+
+  def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
   def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
 }
 
-object AclChangeNotificationZNode {
-  def path = "/kafka-acl-changes"
+object AclChangeNotificationSequenceZNode {
+  val Separator = ":"
+  def SequenceNumberPrefix = "acl_changes_"
+
+  def encode(resource: Resource): Array[Byte] = {
+    (resource.resourceType.name + Separator + resource.name).getBytes(UTF_8)
+  }
+
+  def decode(nameType: ResourceNameType, bytes: Array[Byte]): Resource = {
+    val str = new String(bytes, UTF_8)
+    str.split(Separator, 2) match {
+      case Array(resourceType, name, _*) => Resource(ResourceType.fromString(resourceType), name, nameType)
+      case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
+    }
+  }
 }
 
-object AclChangeNotificationSequenceZNode {
-  val SequenceNumberPrefix = "acl_changes_"
-  def createPath = s"${AclChangeNotificationZNode.path}/$SequenceNumberPrefix"
-  def deletePath(sequenceNode: String) = s"${AclChangeNotificationZNode.path}/${sequenceNode}"
-  def encode(resourceName : String): Array[Byte] = resourceName.getBytes(UTF_8)
-  def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+case class AclChangeNotificationSequenceZNode(store: ZkAclStore) {
+  def createPath = s"${store.aclChangePath}/${AclChangeNotificationSequenceZNode.SequenceNumberPrefix}"
+  def deletePath(sequenceNode: String) = s"${store.aclChangePath}/$sequenceNode"
 }
 
 object ClusterZNode {
@@ -545,11 +577,9 @@ object ZkData {
     ControllerZNode.path,
     ControllerEpochZNode.path,
     IsrChangeNotificationZNode.path,
-    AclZNode.path,
-    AclChangeNotificationZNode.path,
     ProducerIdBlockZNode.path,
     LogDirEventNotificationZNode.path,
-    DelegationTokenAuthZNode.path)
+    DelegationTokenAuthZNode.path) ++ ZkAclStore.securePaths
 
   // These are persistent ZK paths that should exist on kafka broker startup.
   val PersistentZkPaths = Seq(
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 5e4b893..331a449 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
-import org.apache.kafka.common.resource.{Resource, ResourceType}
+import org.apache.kafka.common.resource.{Resource, ResourceNameType, ResourceType}
 import org.junit.rules.Timeout
 import org.junit.Assert._
 
@@ -933,7 +933,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     checkInvalidAlterConfigs(zkClient, servers, client)
   }
 
-  val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+  val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
 
   /**
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ea5a155..b40dab7 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records,
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
-import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType, Resource => AdminResource, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
 import org.junit.Assert._
@@ -70,11 +70,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val deleteRecordsPartition = new TopicPartition(deleteTopic, part)
   val topicAndPartition = TopicAndPartition(topic, part)
   val group = "my-group"
-  val topicResource = new Resource(Topic, topic)
-  val groupResource = new Resource(Group, group)
-  val deleteTopicResource = new Resource(Topic, deleteTopic)
-  val transactionalIdResource = new Resource(TransactionalId, transactionalId)
-  val createTopicResource = new Resource(Topic, createTopic)
+  val topicResource = new Resource(Topic, topic, Literal)
+  val groupResource = new Resource(Group, group, Literal)
+  val deleteTopicResource = new Resource(Topic, deleteTopic, Literal)
+  val transactionalIdResource = new Resource(TransactionalId, transactionalId, Literal)
+  val createTopicResource = new Resource(Topic, createTopic, Literal)
 
   val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
   val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
@@ -383,7 +383,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def deleteAclsRequest = new DeleteAclsRequest.Builder(
     Collections.singletonList(new AclBindingFilter(
-      new ResourceFilter(AdminResourceType.TOPIC, null),
+      new ResourceFilter(AdminResourceType.TOPIC, null, ResourceNameType.LITERAL),
       new AccessControlEntryFilter(userPrincipal.toString, "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
 
   private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
@@ -577,7 +577,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) {
     val topicPartition = new TopicPartition(createTopic, 0)
-    val newTopicResource = new Resource(Topic, createTopic)
+    val newTopicResource = new Resource(Topic, createTopic, Literal)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
     try {
       sendRecords(numRecords, topicPartition)
@@ -733,7 +733,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     // create an unmatched topic
     val unmatchedTopic = "unmatched"
     createTopic(unmatchedTopic)
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic, Literal))
     sendRecords(1, new TopicPartition(unmatchedTopic, part))
     removeAllAcls()
 
@@ -746,7 +746,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     // set the subscription pattern to an internal topic that the consumer has read permission to. Since
     // internal topics are not included, we should not be assigned any partitions from this topic
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)),  new Resource(Topic,
-      GROUP_METADATA_TOPIC_NAME))
+      GROUP_METADATA_TOPIC_NAME, Literal))
     consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
     consumer.poll(0)
     assertTrue(consumer.subscription().isEmpty)
@@ -774,7 +774,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
       // now authorize the user for the internal topic and verify that we can subscribe
       addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
-        GROUP_METADATA_TOPIC_NAME))
+        GROUP_METADATA_TOPIC_NAME, Literal))
       consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
       consumer.poll(0)
       assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
@@ -789,7 +789,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME)
+    val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME, Literal)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
 
     val consumerConfig = new Properties
@@ -842,7 +842,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) {
     val topicPartition = new TopicPartition(newTopic, 0)
-    val newTopicResource = new Resource(Topic, newTopic)
+    val newTopicResource = new Resource(Topic, newTopic, Literal)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
     addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
     this.consumers.head.assign(List(topicPartition).asJava)
@@ -1045,7 +1045,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteTopicsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*", Literal))
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1072,7 +1072,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteRecordsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*", Literal))
     val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
     val version = ApiKeys.DELETE_RECORDS.latestVersion
     val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1090,7 +1090,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testCreatePartitionsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*", Literal))
     val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
     val version = ApiKeys.CREATE_PARTITIONS.latestVersion
     val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
@@ -1283,7 +1283,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic, Literal))
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index c81b32d..b809686 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -65,8 +65,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   }
 
   val numRecords = 1
-  val group = "group"
-  val topic = "e2etopic"
+  val groupPrefix = "gr"
+  val group = s"${groupPrefix}oup"
+  val topicPrefix = "e2e"
+  val topic = s"${topicPrefix}topic"
   val wildcard = "*"
   val part = 0
   val tp = new TopicPartition(topic, part)
@@ -76,11 +78,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
 
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 
-  val topicResource = new Resource(Topic, topic)
-  val groupResource = new Resource(Group, group)
+  val topicResource = new Resource(Topic, topic, Literal)
+  val groupResource = new Resource(Group, group, Literal)
   val clusterResource = Resource.ClusterResource
-  val wildcardTopicResource = new Resource(Topic, wildcard)
-  val wildcardGroupResource = new Resource(Group, wildcard)
+  val prefixedTopicResource = new Resource(Topic, topicPrefix, Prefixed)
+  val prefixedGroupResource = new Resource(Group, groupPrefix, Prefixed)
+  val wildcardTopicResource = new Resource(Topic, wildcard, Literal)
+  val wildcardGroupResource = new Resource(Group, wildcard, Literal)
 
   // Arguments to AclCommand to set ACLs.
   def clusterActionArgs: Array[String] = Array("--authorizer-properties",
@@ -142,6 +146,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
                                           s"--consumer",
                                           s"--producer",
                                           s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def produceConsumePrefixedAclsArgs: Array[String] = Array("--authorizer-properties",
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--add",
+                                          s"--topic=$topicPrefix",
+                                          s"--group=$groupPrefix",
+                                          s"--resource-name-type=prefixed",
+                                          s"--consumer",
+                                          s"--producer",
+                                          s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
 
   def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
   def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read))
@@ -169,7 +182,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     super.setUp()
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource)
-      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*"))
+      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*", Literal))
     }
     // create the test topic with all the brokers as replicas
     createTopic(topic, 1, 3)
@@ -219,12 +232,12 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumeRecords(this.consumers.head, numRecords)
   }
 
-  private def setWildcardResourceAcls() {
-    AclCommand.main(produceConsumeWildcardAclArgs)
-    servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
-    }
+  @Test
+  def testProduceConsumeWithPrefixedAcls(): Unit = {
+    setPrefixedResourceAcls()
+    sendRecords(numRecords, tp)
+    consumers.head.subscribe(List(topic).asJava)
+    consumeRecords(this.consumers.head, numRecords)
   }
 
   @Test
@@ -236,6 +249,22 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumeRecords(this.consumers.head, numRecords, topic = tp2.topic)
   }
 
+  private def setWildcardResourceAcls() {
+    AclCommand.main(produceConsumeWildcardAclArgs)
+    servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
+    }
+  }
+
+  private def setPrefixedResourceAcls() {
+    AclCommand.main(produceConsumePrefixedAclsArgs)
+    servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, prefixedTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, prefixedGroupResource)
+    }
+  }
+
   protected def setAclsAndProduce(tp: TopicPartition) {
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(consumeAclArgs(tp.topic))
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 099af52..b3572c0 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -13,14 +13,15 @@
 package kafka.api
 
 import java.io.File
+import java.util
 
-import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource}
+import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Literal, Operation, PermissionType, SimpleAclAuthorizer, Topic, Prefixed, Acl => AuthAcl, Resource => AuthResource}
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
 import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
-import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
-import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType}
+import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceNameType, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.junit.Assert.assertEquals
 import org.junit.{After, Assert, Before, Test}
@@ -88,25 +89,29 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     closeSasl()
   }
 
-  val acl2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2"),
+  val anyAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*", ResourceNameType.LITERAL),
+    new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
+  val acl2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
-  val acl3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+  val acl3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
+  val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
-  val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"),
+  val prefixAcl = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
-  val transactionalIdAcl = new AclBinding(new Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id"),
+  val transactionalIdAcl = new AclBinding(new Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id", ResourceNameType.LITERAL),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
-  val groupAcl = new AclBinding(new Resource(ResourceType.GROUP, "*"),
+  val groupAcl = new AclBinding(new Resource(ResourceType.GROUP, "*", ResourceNameType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
 
   @Test
   override def testAclOperations(): Unit = {
     client = AdminClient.create(createConfig())
-    assertEquals(7, client.describeAcls(AclBindingFilter.ANY).values.get().size)
+    assertEquals(7, getAcls(AclBindingFilter.ANY).size)
     val results = client.createAcls(List(acl2, acl3).asJava)
     assertEquals(Set(acl2, acl3), results.values.keySet().asScala)
     results.values.values().asScala.foreach(value => value.get)
-    val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+    val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW))
     val results2 = client.createAcls(List(aclUnknown).asJava)
     assertEquals(Set(aclUnknown), results2.values.keySet().asScala)
@@ -118,13 +123,6 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet)
   }
 
-  def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
-    TestUtils.waitUntilTrue(() => {
-      val results = client.describeAcls(filter).values.get()
-      acls == results.asScala.toSet
-    }, s"timed out waiting for ACLs $acls")
-  }
-
   @Test
   def testAclOperations2(): Unit = {
     client = AdminClient.create(createConfig())
@@ -134,9 +132,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
     waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
 
-    val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null), AccessControlEntryFilter.ANY)
-    val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY)
-    val filterC = new AclBindingFilter(new ResourceFilter(ResourceType.TRANSACTIONAL_ID, null), AccessControlEntryFilter.ANY)
+    val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val filterC = new AclBindingFilter(new ResourceFilter(ResourceType.TRANSACTIONAL_ID, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
 
     waitForDescribeAcls(client, filterA, Set(groupAcl))
     waitForDescribeAcls(client, filterC, Set(transactionalIdAcl))
@@ -152,11 +150,125 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
   }
 
   @Test
+  def testAclDescribe(): Unit = {
+    client = AdminClient.create(createConfig())
+    ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
+
+    val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+    val allLiteralTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val allPrefixedTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
+    val literalMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val prefixedMyTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
+    val allMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+    val allFooTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foobar", ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+
+    assertEquals(Set(anyAcl), getAcls(anyAcl.toFilter))
+    assertEquals(Set(prefixAcl), getAcls(prefixAcl.toFilter))
+    assertEquals(Set(acl2), getAcls(acl2.toFilter))
+    assertEquals(Set(fooAcl), getAcls(fooAcl.toFilter))
+
+    assertEquals(Set(acl2), getAcls(literalMyTopic2Acls))
+    assertEquals(Set(prefixAcl), getAcls(prefixedMyTopicAcls))
+    assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allLiteralTopicAcls))
+    assertEquals(Set(prefixAcl), getAcls(allPrefixedTopicAcls))
+    assertEquals(Set(anyAcl, acl2, prefixAcl), getAcls(allMyTopic2Acls))
+    assertEquals(Set(anyAcl, fooAcl), getAcls(allFooTopicAcls))
+    assertEquals(Set(anyAcl, acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
+  }
+
+  @Test
+  def testAclDelete(): Unit = {
+    client = AdminClient.create(createConfig())
+    ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
+
+    val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+    val allLiteralTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val allPrefixedTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
+
+    // Delete only ACLs on literal 'mytopic2' topic
+    var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet
+    assertEquals(Set(acl2), deleted)
+    assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls))
+
+    ensureAcls(deleted)
+
+    // Delete only ACLs on literal '*' topic
+    deleted = client.deleteAcls(List(anyAcl.toFilter).asJava).all().get().asScala.toSet
+    assertEquals(Set(anyAcl), deleted)
+    assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
+
+    ensureAcls(deleted)
+
+    // Delete only ACLs on specific prefixed 'mytopic' topics:
+    deleted = client.deleteAcls(List(prefixAcl.toFilter).asJava).all().get().asScala.toSet
+    assertEquals(Set(prefixAcl), deleted)
+    assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls))
+
+    ensureAcls(deleted)
+
+    // Delete all literal ACLs:
+    deleted = client.deleteAcls(List(allLiteralTopicAcls).asJava).all().get().asScala.toSet
+    assertEquals(Set(anyAcl, acl2, fooAcl), deleted)
+    assertEquals(Set(prefixAcl), getAcls(allTopicAcls))
+
+    ensureAcls(deleted)
+
+    // Delete all prefixed ACLs:
+    deleted = client.deleteAcls(List(allPrefixedTopicAcls).asJava).all().get().asScala.toSet
+    assertEquals(Set(prefixAcl), deleted)
+    assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls))
+
+    ensureAcls(deleted)
+
+    // Delete all topic ACLs:
+    deleted = client.deleteAcls(List(allTopicAcls).asJava).all().get().asScala.toSet
+    assertEquals(Set(), getAcls(allTopicAcls))
+  }
+
+  //noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors
+  @Test
+  def testLegacyAclOpsNeverAffectOrReturnPrefixed(): Unit = {
+    client = AdminClient.create(createConfig())
+    ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))  // <-- prefixed exists, but should never be returned.
+
+    val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+    val legacyAllTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val legacyMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val legacyAnyTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val legacyFooTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+
+    assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(legacyAllTopicAcls))
+    assertEquals(Set(acl2), getAcls(legacyMyTopic2Acls))
+    assertEquals(Set(anyAcl), getAcls(legacyAnyTopicAcls))
+    assertEquals(Set(fooAcl), getAcls(legacyFooTopicAcls))
+
+    // Delete only (legacy) ACLs on 'mytopic2' topic
+    var deleted = client.deleteAcls(List(legacyMyTopic2Acls).asJava).all().get().asScala.toSet
+    assertEquals(Set(acl2), deleted)
+    assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls))
+
+    ensureAcls(deleted)
+
+    // Delete only (legacy) ACLs on '*' topic
+    deleted = client.deleteAcls(List(legacyAnyTopicAcls).asJava).all().get().asScala.toSet
+    assertEquals(Set(anyAcl), deleted)
+    assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
+
+    ensureAcls(deleted)
+
+    // Delete all (legacy) topic ACLs:
+    deleted = client.deleteAcls(List(legacyAllTopicAcls).asJava).all().get().asScala.toSet
+    assertEquals(Set(anyAcl, acl2, fooAcl), deleted)
+    assertEquals(Set(), getAcls(legacyAllTopicAcls))
+    assertEquals(Set(prefixAcl), getAcls(allTopicAcls))
+  }
+
+  @Test
   def testAttemptToCreateInvalidAcls(): Unit = {
     client = AdminClient.create(createConfig())
-    val clusterAcl = new AclBinding(new Resource(ResourceType.CLUSTER, "foobar"),
+    val clusterAcl = new AclBinding(new Resource(ResourceType.CLUSTER, "foobar", ResourceNameType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
-    val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, ""),
+    val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, "", ResourceNameType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
     val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions())
     assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.values.keySet().asScala)
@@ -224,7 +336,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
 
   private def testAclGet(expectAuth: Boolean): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*"),
+      val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*", ResourceNameType.LITERAL),
         new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
       val results = client.describeAcls(userAcl.toFilter)
       if (expectAuth) {
@@ -276,4 +388,22 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     testAclGet(expectAuth = true)
     testAclCreateGetDelete(expectAuth = false)
   }
+
+  private def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
+    var lastResults: util.Collection[AclBinding] = null
+    TestUtils.waitUntilTrue(() => {
+      lastResults = client.describeAcls(filter).values.get()
+      acls == lastResults.asScala.toSet
+    }, s"timed out waiting for ACLs $acls.\nActual $lastResults")
+  }
+
+  private def ensureAcls(bindings: Set[AclBinding]): Unit = {
+    client.createAcls(bindings.asJava).all().get()
+
+    bindings.foreach(binding => waitForDescribeAcls(client, binding.toFilter, Set(binding)))
+  }
+
+  private def getAcls(allTopicAcls: AclBindingFilter) = {
+    client.describeAcls(allTopicAcls).values.get().asScala.toSet
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 9197f79..71754ba 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -24,21 +24,22 @@ import kafka.server.KafkaConfig
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.Test
+import org.junit.{After, Before, Test}
 
 class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
+  private val principal: KafkaPrincipal = KafkaPrincipal.fromString("User:test2")
   private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
-    KafkaPrincipal.fromString("User:test2"),
+    principal,
     KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \< \> \; ')"""))
   private val Hosts = Set("host1", "host2")
   private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
   private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")
 
-  private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
-  private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2"))
-  private val TransactionalIdResources = Set(new Resource(TransactionalId, "t0"), new Resource(TransactionalId, "t1"))
-  private val TokenResources = Set(new Resource(DelegationToken, "token1"), new Resource(DelegationToken, "token2"))
+  private val TopicResources = Set(Resource(Topic, "test-1", Literal), Resource(Topic, "test-2", Literal))
+  private val GroupResources = Set(Resource(Group, "testGroup-1", Literal), Resource(Group, "testGroup-2", Literal))
+  private val TransactionalIdResources = Set(Resource(TransactionalId, "t0", Literal), Resource(TransactionalId, "t1", Literal))
+  private val TokenResources = Set(Resource(DelegationToken, "token1", Literal), Resource(DelegationToken, "token2", Literal))
 
   private val ResourceToCommand = Map[Set[Resource], Array[String]](
     TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
@@ -82,45 +83,71 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
       ProducerResourceToAcls(enableIdempotence = true).getOrElse(k, Set.empty[Acl])) }
   )
 
-  @Test
-  def testAclCli() {
-    val brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
+  private var brokerProps: Properties = _
+  private var zkArgs: Array[String] = _
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+
+    brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
     brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
-    val args = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
 
+    zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
+  }
+
+  @Test
+  def testAclCli() {
     for ((resources, resourceCmd) <- ResourceToCommand) {
       for (permissionType <- PermissionType.values) {
         val operationToCmd = ResourceToOperations(resources)
         val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
-          AclCommand.main(args ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
+          AclCommand.main(zkArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
           for (resource <- resources) {
-            withAuthorizer(brokerProps) { authorizer =>
+            withAuthorizer() { authorizer =>
               TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
             }
           }
 
-          testRemove(resources, resourceCmd, args, brokerProps)
+          testRemove(resources, resourceCmd, brokerProps)
       }
     }
   }
 
   @Test
   def testProducerConsumerCli() {
-    val brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
-    brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
-    val args = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
-
     for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
       val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
-      AclCommand.main(args ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
+      AclCommand.main(zkArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
       for ((resources, acls) <- resourcesToAcls) {
         for (resource <- resources) {
-          withAuthorizer(brokerProps) { authorizer =>
+          withAuthorizer() { authorizer =>
             TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
           }
         }
       }
-      testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, args, brokerProps)
+      testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, brokerProps)
+    }
+  }
+
+  @Test
+  def testAclsOnPrefixedResources(): Unit = {
+    val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-", "--resource-name-type", "Prefixed")
+
+    AclCommand.main(zkArgs ++ cmd :+ "--add")
+
+    withAuthorizer() { authorizer =>
+      val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
+      val describeAcl = Acl(principal, Allow, Acl.WildCardHost, Describe)
+      val createAcl = Acl(principal, Allow, Acl.WildCardHost, Create)
+      TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", Prefixed))
+    }
+
+    AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
+
+    withAuthorizer() { authorizer =>
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", Literal))
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Topic, "Test-", Prefixed))
     }
   }
 
@@ -130,10 +157,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
     AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
   }
 
-  private def testRemove(resources: Set[Resource], resourceCmd: Array[String], args: Array[String], brokerProps: Properties) {
+  private def testRemove(resources: Set[Resource], resourceCmd: Array[String], brokerProps: Properties) {
     for (resource <- resources) {
-      AclCommand.main(args ++ resourceCmd :+ "--remove" :+ "--force")
-      withAuthorizer(brokerProps) { authorizer =>
+      AclCommand.main(zkArgs ++ resourceCmd :+ "--remove" :+ "--force")
+      withAuthorizer() { authorizer =>
         TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
       }
     }
@@ -150,8 +177,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
     Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString))
   }
 
-  def withAuthorizer(props: Properties)(f: Authorizer => Unit) {
-    val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
+  def withAuthorizer()(f: Authorizer => Unit) {
+    val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
     val authZ = new SimpleAclAuthorizer
     try {
       authZ.configure(kafkaConfig.originals)
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index e46bd9b..cee0bd6 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -16,31 +16,39 @@
  */
 package kafka.common
 
-import java.nio.charset.StandardCharsets
-
+import kafka.security.auth.{Group, Literal, Resource}
 import kafka.utils.TestUtils
-import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness}
-import org.junit.Test
+import kafka.zk.{AclChangeNotificationSequenceZNode, ZkAclStore, ZooKeeperTestHarness}
+import org.junit.{After, Test}
 
 class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
 
+  var notificationListener: ZkNodeChangeNotificationListener = _
+
+  @After
+  override def tearDown(): Unit = {
+    if (notificationListener != null) {
+      notificationListener.close()
+    }
+  }
+
   @Test
   def testProcessNotification() {
-    @volatile var notification: String = null
+    @volatile var notification: Resource = null
     @volatile var invocationCount = 0
     val notificationHandler = new NotificationHandler {
       override def processNotification(notificationMessage: Array[Byte]): Unit = {
-        notification = new String(notificationMessage, StandardCharsets.UTF_8)
+        notification = AclChangeNotificationSequenceZNode.decode(Literal, notificationMessage)
         invocationCount += 1
       }
     }
 
     zkClient.createAclPaths()
-    val notificationMessage1 = "message1"
-    val notificationMessage2 = "message2"
+    val notificationMessage1 = Resource(Group, "messageA", Literal)
+    val notificationMessage2 = Resource(Group, "messageB", Literal)
     val changeExpirationMs = 1000
 
-    val notificationListener = new ZkNodeChangeNotificationListener(zkClient,  AclChangeNotificationZNode.path,
+    notificationListener = new ZkNodeChangeNotificationListener(zkClient,  ZkAclStore(Literal).aclChangePath,
       AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs)
     notificationListener.init()
 
@@ -60,7 +68,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
       "Failed to send/process notification message in the timeout period.")
 
-    (3 to 10).foreach(i => zkClient.createAclChangeNotification("message" + i))
+    (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, Literal)))
 
     TestUtils.waitUntilTrue(() => invocationCount == 10 ,
       s"Expected 10 invocations of processNotifications, but there were $invocationCount")
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1e18f1d..3e7f6a8 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -20,7 +20,7 @@ import java.net.InetAddress
 import java.util.UUID
 
 import kafka.network.RequestChannel.Session
-import kafka.security.auth.Acl.WildCardHost
+import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
@@ -30,14 +30,22 @@ import org.junit.{After, Before, Test}
 
 class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
+  val allowReadAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Read)
+  val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
+  val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
+
+  val wildCardResource = Resource(Topic, WildCardResource, Literal)
+  val prefixedResource = Resource(Topic, "foo", Prefixed)
+
   val simpleAclAuthorizer = new SimpleAclAuthorizer
   val simpleAclAuthorizer2 = new SimpleAclAuthorizer
   val testPrincipal = Acl.WildCardPrincipal
   val testHostName = InetAddress.getByName("192.168.0.1")
-  val session = Session(testPrincipal, testHostName)
   var resource: Resource = null
   val superUsers = "User:superuser1; User:superuser2"
   val username = "alice"
+  val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+  val session = Session(principal, testHostName)
   var config: KafkaConfig = null
 
   @Before
@@ -54,7 +62,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     config = KafkaConfig.fromProps(props)
     simpleAclAuthorizer.configure(config.originals)
     simpleAclAuthorizer2.configure(config.originals)
-    resource = new Resource(Topic, UUID.randomUUID().toString)
+    resource = new Resource(Topic, "foo-" + UUID.randomUUID(), Literal)
   }
 
   @After
@@ -64,6 +72,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     super.tearDown()
   }
 
+  @Test(expected = classOf[IllegalArgumentException])
+  def testAuthorizeThrowsOnNoneLiteralResource() {
+    simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", Prefixed))
+  }
+
   @Test
   def testTopicAcl() {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
@@ -161,7 +174,6 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val host1 = InetAddress.getByName("192.168.3.1")
     val readAcl = new Acl(user1, Allow, host1.getHostAddress, Read)
-    val wildCardResource = new Resource(resource.resourceType, Resource.WildCardResource)
 
     val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
 
@@ -222,10 +234,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
 
     val resourceToAcls = Map[Resource, Set[Acl]](
-      new Resource(Topic, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
-      new Resource(Cluster, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
-      new Resource(Group, Resource.WildCardResource) -> acls,
-      new Resource(Group, "test-ConsumerGroup") -> acls
+      new Resource(Topic, Resource.WildCardResource, Literal) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
+      new Resource(Cluster, Resource.WildCardResource, Literal) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
+      new Resource(Group, Resource.WildCardResource, Literal) -> acls,
+      new Resource(Group, "test-ConsumerGroup", Literal) -> acls
     )
 
     resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
@@ -253,7 +265,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     simpleAclAuthorizer.addAcls(acls, resource)
 
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
-    val resource1 = new Resource(Topic, "test-2")
+    val resource1 = new Resource(Topic, "test-2", Literal)
     val acl2 = new Acl(user2, Deny, "host3", Read)
     val acls1 = Set[Acl](acl2)
     simpleAclAuthorizer.addAcls(acls1, resource1)
@@ -272,7 +284,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test
   def testLocalConcurrentModificationOfResourceAcls() {
-    val commonResource = new Resource(Topic, "test")
+    val commonResource = new Resource(Topic, "test", Literal)
 
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val acl1 = new Acl(user1, Allow, WildCardHost, Read)
@@ -288,7 +300,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test
   def testDistributedConcurrentModificationOfResourceAcls() {
-    val commonResource = new Resource(Topic, "test")
+    val commonResource = new Resource(Topic, "test", Literal)
 
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val acl1 = new Acl(user1, Allow, WildCardHost, Read)
@@ -318,7 +330,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test
   def testHighConcurrencyModificationOfResourceAcls() {
-    val commonResource = new Resource(Topic, "test")
+    val commonResource = new Resource(Topic, "test", Literal)
 
     val acls = (0 to 50).map { i =>
       val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
@@ -419,6 +431,127 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource)
   }
 
+  @Test
+  def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
+
+    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testDeleteAclOnWildcardResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
+
+    simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), wildCardResource)
+
+    assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(wildCardResource))
+  }
+
+  @Test
+  def testDeleteAllAclOnWildcardResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
+
+    simpleAclAuthorizer.removeAcls(wildCardResource)
+
+    assertEquals(Map(), simpleAclAuthorizer.getAcls())
+  }
+
+  @Test
+  def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
+
+    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testDeleteAclOnPrefixedResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
+
+    simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), prefixedResource)
+
+    assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(prefixedResource))
+  }
+
+  @Test
+  def testDeleteAllAclOnPrefixedResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
+
+    simpleAclAuthorizer.removeAcls(prefixedResource)
+
+    assertEquals(Map(), simpleAclAuthorizer.getAcls())
+  }
+
+  @Test
+  def testAddAclsOnLiteralResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), resource)
+    simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), resource)
+
+    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(resource))
+    assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
+    assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
+  }
+
+  @Test
+  def testAddAclsOnWildcardResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
+    simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), wildCardResource)
+
+    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(wildCardResource))
+    assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
+    assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
+  }
+
+  @Test
+  def testAddAclsOnPrefiexedResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
+    simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), prefixedResource)
+
+    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(prefixedResource))
+    assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
+    assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
+  }
+
+  @Test
+  def testAuthorizeWithPrefixedResource(): Unit = {
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", Literal))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", Prefixed))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", Literal))
+
+    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
+
+    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testGetAclsPrincipal(): Unit = {
+    assertEquals(0, simpleAclAuthorizer.getAcls(principal).size)
+
+    val acl1 = new Acl(principal, Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), resource)
+    assertEquals(1, simpleAclAuthorizer.getAcls(principal).size)
+
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Topic, Acl.WildCardResource, Literal))
+    assertEquals(2, simpleAclAuthorizer.getAcls(principal).size)
+
+    val acl2 = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, "groupA", Literal))
+    assertEquals(3, simpleAclAuthorizer.getAcls(principal).size)
+
+    // add prefixed principal acl on wildcard group name
+    val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName.charAt(0) + WildCardResource), Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, Acl.WildCardResource, Literal))
+    assertEquals(4, simpleAclAuthorizer.getAcls(principal).size)
+  }
+
   private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
     var acls = originalAcls
 
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index fe7aca2..eec7175 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -242,7 +242,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
 
     //get all tokens for multiple owners (owner1, renewer4) and with permission
     var acl = new Acl(owner1, Allow, WildCardHost, Describe)
-    simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId3))
+    simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId3, Literal))
     tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1, renewer4))
     assert(tokens.size == 3)
 
@@ -257,7 +257,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
     //get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
     hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
     acl = new Acl(renewer2, Allow, WildCardHost, Describe)
-    simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId2))
+    simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId2, Literal))
     tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession,  renewer2, List(renewer2, renewer3))
     assert(tokens.size == 2)
 
@@ -271,7 +271,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
       List()
     }
     else {
-      def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId))
+      def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId, Literal))
       def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, Option(requestedOwners), token, authorizeToken)
       tokenManager.getTokens(eligible)
     }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3f2f66c..59f543b 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -22,10 +22,9 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
-
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType, Resource => AdminResource, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
@@ -318,7 +317,7 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.DELETE_ACLS =>
           new DeleteAclsRequest.Builder(Collections.singletonList(new AclBindingFilter(
-            new ResourceFilter(AdminResourceType.TOPIC, null),
+            new ResourceFilter(AdminResourceType.TOPIC, null, ResourceNameType.LITERAL),
             new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY))))
 
         case ApiKeys.DESCRIBE_CONFIGS =>
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ec6c756..f50ef3a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1142,8 +1142,11 @@ object TestUtils extends Logging {
   }
 
   def waitAndVerifyAcls(expected: Set[Acl], authorizer: Authorizer, resource: Resource) = {
+    val newLine = scala.util.Properties.lineSeparator
+
     TestUtils.waitUntilTrue(() => authorizer.getAcls(resource) == expected,
-      s"expected acls $expected but got ${authorizer.getAcls(resource)}", waitTime = JTestUtils.DEFAULT_MAX_WAIT_MS)
+      s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
+        s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}", waitTime = JTestUtils.DEFAULT_MAX_WAIT_MS)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 1aeca22..cfaf731 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -427,72 +427,76 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   @Test
   def testAclManagementMethods() {
 
-    assertFalse(zkClient.pathExists(AclZNode.path))
-    assertFalse(zkClient.pathExists(AclChangeNotificationZNode.path))
-    ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(ResourceTypeZNode.path(resource.name))))
+    ZkAclStore.stores.foreach(store => {
+      assertFalse(zkClient.pathExists(store.aclPath))
+      assertFalse(zkClient.pathExists(store.aclChangePath))
+      ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(store.path(resource))))
+    })
 
     // create acl paths
     zkClient.createAclPaths
 
-    assertTrue(zkClient.pathExists(AclZNode.path))
-    assertTrue(zkClient.pathExists(AclChangeNotificationZNode.path))
-    ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(ResourceTypeZNode.path(resource.name))))
+    ZkAclStore.stores.foreach(store => {
+      assertTrue(zkClient.pathExists(store.aclPath))
+      assertTrue(zkClient.pathExists(store.aclChangePath))
+      ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(store.path(resource))))
 
-    val resource1 = new Resource(Topic, UUID.randomUUID().toString)
-    val resource2 = new Resource(Topic, UUID.randomUUID().toString)
+      val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.nameType)
+      val resource2 = new Resource(Topic, UUID.randomUUID().toString, store.nameType)
 
-    // try getting acls for non-existing resource
-    var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
-    assertTrue(versionedAcls.acls.isEmpty)
-    assertEquals(-1, versionedAcls.zkVersion)
-    assertFalse(zkClient.resourceExists(resource1))
+      // try getting acls for non-existing resource
+      var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+      assertTrue(versionedAcls.acls.isEmpty)
+      assertEquals(-1, versionedAcls.zkVersion)
+      assertFalse(zkClient.resourceExists(resource1))
 
 
-    val acl1 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"), Deny, "host1" , Read)
-    val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Allow, "*", Read)
-    val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1", Read)
+      val acl1 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"), Deny, "host1" , Read)
+      val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Allow, "*", Read)
+      val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1", Read)
 
-    //create acls for resources
-    zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl2), 0)
-    zkClient.conditionalSetOrCreateAclsForResource(resource2, Set(acl1, acl3), 0)
+      //create acls for resources
+      zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl2), 0)
+      zkClient.conditionalSetOrCreateAclsForResource(resource2, Set(acl1, acl3), 0)
 
-    versionedAcls = zkClient.getVersionedAclsForResource(resource1)
-    assertEquals(Set(acl1, acl2), versionedAcls.acls)
-    assertEquals(0, versionedAcls.zkVersion)
-    assertTrue(zkClient.resourceExists(resource1))
+      versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+      assertEquals(Set(acl1, acl2), versionedAcls.acls)
+      assertEquals(0, versionedAcls.zkVersion)
+      assertTrue(zkClient.resourceExists(resource1))
 
-    //update acls for resource
-    zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl3), 0)
+      //update acls for resource
+      zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl3), 0)
 
-    versionedAcls = zkClient.getVersionedAclsForResource(resource1)
-    assertEquals(Set(acl1, acl3), versionedAcls.acls)
-    assertEquals(1, versionedAcls.zkVersion)
+      versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+      assertEquals(Set(acl1, acl3), versionedAcls.acls)
+      assertEquals(1, versionedAcls.zkVersion)
 
-    //get resource Types
-    assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes().toSet)
+      //get resource Types
+      assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes(store.nameType).toSet)
 
-    //get resource name
-    val resourceNames = zkClient.getResourceNames(Topic.name)
-    assertEquals(2, resourceNames.size)
-    assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet)
+      //get resource name
+      val resourceNames = zkClient.getResourceNames(store.nameType, Topic)
+      assertEquals(2, resourceNames.size)
+      assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet)
 
-    //delete resource
-    assertTrue(zkClient.deleteResource(resource1))
-    assertFalse(zkClient.resourceExists(resource1))
+      //delete resource
+      assertTrue(zkClient.deleteResource(resource1))
+      assertFalse(zkClient.resourceExists(resource1))
 
-    //delete with invalid expected zk version
-    assertFalse(zkClient.conditionalDelete(resource2, 10))
-    //delete with valid expected zk version
-    assertTrue(zkClient.conditionalDelete(resource2, 0))
+      //delete with invalid expected zk version
+      assertFalse(zkClient.conditionalDelete(resource2, 10))
+      //delete with valid expected zk version
+      assertTrue(zkClient.conditionalDelete(resource2, 0))
 
 
-    zkClient.createAclChangeNotification("resource1")
-    zkClient.createAclChangeNotification("resource2")
+      zkClient.createAclChangeNotification(Resource(Group, "resource1", store.nameType))
+      zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.nameType))
 
-    assertEquals(2, zkClient.getChildren(AclChangeNotificationZNode.path).size)
+      assertEquals(2, zkClient.getChildren(store.aclChangePath).size)
 
-    zkClient.deleteAclChangeNotifications()
-    assertTrue(zkClient.getChildren(AclChangeNotificationZNode.path).isEmpty)
+      zkClient.deleteAclChangeNotifications()
+      assertTrue(zkClient.getChildren(store.aclChangePath).isEmpty)
+    })
   }
 
   @Test
diff --git a/docs/security.html b/docs/security.html
index 0ef37d7..57bba47 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1100,6 +1100,19 @@
             <td>Resource</td>
         </tr>
         <tr>
+            <td>--resource-name-type [name-type]</td>
+            <td>Specifies the resource name type to use.<br>
+                Valid values are:<br>
+                <ul>
+                    <li><b>Literal</b> Match resource names exactly or, in the case of the Wildcard name '*', match all resources.</li>
+                    <li><b>Prefixed</b> Match any resource whose name starts with the prefix.</li>
+                    <li><b>All</b> (list|remove only) Matching any name type, including the Wildcard name.</li>
+                </ul>
+            </td>
+            <td>literal</td>
+            <td>Configuration</td>
+        </tr>
+        <tr>
             <td>--allow-principal</td>
             <td>Principal is in PrincipalType:name format that will be added to ACL with Allow permission. <br>You can specify multiple --allow-principal in a single command.</td>
             <td></td>
@@ -1161,14 +1174,26 @@
             <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic</pre>
             Note that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported).
             Above examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].</li>
+            You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0"
+            You can do that by using the wildcard resource '*', e.g. by executing the CLI with following options:
+            <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *</pre>
+            You can add acls on resources matching a certain prefix, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name is prefixed with 'Test-' from any host".
+            You can do that by executing the CLI with following options:
+            <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-name-type Prefixed</pre>
+            Note, --resource-name-type defaults to 'literal', which only affects resources with the exact same name. The exception to this is the wildcard resource name '*', which should also be added using 'literal'.
 
         <li><b>Removing Acls</b><br>
                 Removing acls is pretty much the same. The only difference is instead of --add option users will have to specify --remove option. To remove the acls added by the first example above we can execute the CLI with following options:
             <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </pre></li>
+            If you wan to remove the prefixed acl added above we can execute the CLI with following options:
+            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-name-type Prefixed</pre></li>
 
         <li><b>List Acls</b><br>
                 We can list acls for any resource by specifying the --list option with the resource. To list all acls for Test-topic we can execute the CLI with following options:
                 <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic</pre></li>
+                However, this will only return the acls that have been added to this exact resource. Other acls can exist that affect access to the topic,
+                e.g. any acls on the topic wildcard '*', or any acls on resources matching a certain prefix. To list all acls affecting a topic we can use the '--resource-name-type any' option, e.g.
+                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-name-type any</pre></li>
 
         <li><b>Adding or removing a principal as producer or consumer</b><br>
                 The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of  Test-topic we can execute the following command:
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4f1c5b3..0430b43 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -61,6 +61,11 @@
         Similarly for the message format version.</li>
     <li>If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities.
         Hot-swapping the jar-file only might not work.</li>
+    <li>ACLs should not be added to prefixed resources,
+        (added in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>),
+        until all brokers in the cluster have been updated.
+        <p><b>NOTE:</b> any prefixed ACLs added to a cluster will be ignored should the cluster be downgraded again.
+    </li>
 </ol>
 
 <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2.0.0</a></h5>
@@ -103,12 +108,15 @@
         the <code>--new-consumer</code> option for all consumer based tools. This option is redundant since the new consumer is automatically
         used if --bootstrap-server is defined.
     </li>
+    <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a> adds the ability
+        to define ACLs on prefixed resources, e.g. any topic starting with 'foo'.</li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>
 <ul>
     <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over">KIP-279</a>: OffsetsForLeaderEpochResponse v1 introduces a partition-level <code>leader_epoch</code> field. </li>
     <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-219+-+Improve+quota+communication">KIP-219</a>: Bump up the protocol versions of non-cluster action requests and responses that are throttled on quota violation.</li>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>: Bump up the protocol versions ACL create, describe and delete requests and responses.</li>
 </ul>
 
 

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.

Mime
View raw message