kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/3] kafka git commit: KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)
Date Thu, 18 May 2017 02:31:03 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
new file mode 100644
index 0000000..f792bbd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.clients.admin.AccessControlEntry;
+import org.apache.kafka.clients.admin.AclBinding;
+import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CreateAclsRequest extends AbstractRequest {
+    private final static String CREATIONS = "creations";
+
+    public static class AclCreation {
+        private final AclBinding acl;
+
+        public AclCreation(AclBinding acl) {
+            this.acl = acl;
+        }
+
+        static AclCreation fromStruct(Struct struct) {
+            Resource resource = RequestUtils.resourceFromStructFields(struct);
+            AccessControlEntry entry = RequestUtils.aceFromStructFields(struct);
+            return new AclCreation(new AclBinding(resource, entry));
+        }
+
+        public AclBinding acl() {
+            return acl;
+        }
+
+        void setStructFields(Struct struct) {
+            RequestUtils.resourceSetStructFields(acl.resource(), struct);
+            RequestUtils.aceSetStructFields(acl.entry(), struct);
+        }
+
+        @Override
+        public String toString() {
+            return "(acl=" + acl + ")";
+        }
+    }
+
+    public static class Builder extends AbstractRequest.Builder<CreateAclsRequest> {
+        private final List<AclCreation> creations;
+
+        public Builder(List<AclCreation> creations) {
+            super(ApiKeys.CREATE_ACLS);
+            this.creations = creations;
+        }
+
+        @Override
+        public CreateAclsRequest build(short version) {
+            return new CreateAclsRequest(version, creations);
+        }
+
+        @Override
+        public String toString() {
+            return "(type=CreateAclsRequest, creations=" + Utils.join(creations, ", ") + ")";
+        }
+    }
+
+    private final List<AclCreation> aclCreations;
+
+    CreateAclsRequest(short version, List<AclCreation> aclCreations) {
+        super(version);
+        this.aclCreations = aclCreations;
+    }
+
+    public CreateAclsRequest(Struct struct, short version) {
+        super(version);
+        this.aclCreations = new ArrayList<>();
+        for (Object creationStructObj : struct.getArray(CREATIONS)) {
+            Struct creationStruct = (Struct) creationStructObj;
+            aclCreations.add(AclCreation.fromStruct(creationStruct));
+        }
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.CREATE_ACLS.requestSchema(version()));
+        List<Struct> requests = new ArrayList<>();
+        for (AclCreation creation : aclCreations) {
+            Struct creationStruct = struct.instance(CREATIONS);
+            creation.setStructFields(creationStruct);
+            requests.add(creationStruct);
+        }
+        struct.set(CREATIONS, requests.toArray());
+        return struct;
+    }
+
+    public List<AclCreation> aclCreations() {
+        return aclCreations;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) {
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>();
+                for (int i = 0; i < aclCreations.size(); i++) {
+                    responses.add(new CreateAclsResponse.AclCreationResponse(throwable));
+                }
+                return new CreateAclsResponse(throttleTimeMs, responses);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_ACLS.latestVersion()));
+        }
+    }
+
+    public static CreateAclsRequest parse(ByteBuffer buffer, short version) {
+        return new CreateAclsRequest(ApiKeys.CREATE_ACLS.parseRequest(version, buffer), version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
new file mode 100644
index 0000000..885981a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CreateAclsResponse extends AbstractResponse {
+    private final static String THROTTLE_TIME_MS = "throttle_time_ms";
+    private final static String CREATION_RESPONSES = "creation_responses";
+    private final static String ERROR_CODE = "error_code";
+    private final static String ERROR_MESSAGE = "error_message";
+
+    public static class AclCreationResponse {
+        private final Throwable throwable;
+
+        public AclCreationResponse(Throwable throwable) {
+            this.throwable = throwable;
+        }
+
+        public Throwable throwable() {
+            return throwable;
+        }
+
+        @Override
+        public String toString() {
+            return "(" + throwable + ")";
+        }
+    }
+
+    private final int throttleTimeMs;
+
+    private final List<AclCreationResponse> aclCreationResponses;
+
+    public CreateAclsResponse(int throttleTimeMs, List<AclCreationResponse> aclCreationResponses) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.aclCreationResponses = aclCreationResponses;
+    }
+
+    public CreateAclsResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+        this.aclCreationResponses = new ArrayList<>();
+        for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) {
+            Struct responseStruct = (Struct) responseStructObj;
+            short errorCode = responseStruct.getShort(ERROR_CODE);
+            String errorMessage = responseStruct.getString(ERROR_MESSAGE);
+            if (errorCode != 0) {
+                this.aclCreationResponses.add(new AclCreationResponse(
+                        Errors.forCode(errorCode).exception(errorMessage)));
+            } else {
+                this.aclCreationResponses.add(new AclCreationResponse(null));
+            }
+        }
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.CREATE_ACLS.responseSchema(version));
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        List<Struct> responseStructs = new ArrayList<>();
+        for (AclCreationResponse response : aclCreationResponses) {
+            Struct responseStruct = struct.instance(CREATION_RESPONSES);
+            if (response.throwable() == null) {
+                responseStruct.set(ERROR_CODE, (short) 0);
+            } else {
+                Errors errors = Errors.forException(response.throwable());
+                responseStruct.set(ERROR_CODE, errors.code());
+                responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage());
+            }
+            responseStructs.add(responseStruct);
+        }
+        struct.set(CREATION_RESPONSES, responseStructs.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public List<AclCreationResponse> aclCreationResponses() {
+        return aclCreationResponses;
+    }
+
+    public static CreateAclsResponse parse(ByteBuffer buffer, short version) {
+        return new CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
new file mode 100644
index 0000000..8a9ee19
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.clients.admin.AccessControlEntryFilter;
+import org.apache.kafka.clients.admin.AclBindingFilter;
+import org.apache.kafka.clients.admin.ResourceFilter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.protocol.ApiKeys.DELETE_ACLS;
+
+public class DeleteAclsRequest extends AbstractRequest {
+    private final static String FILTERS = "filters";
+
+    public static class Builder extends AbstractRequest.Builder<DeleteAclsRequest> {
+        private final List<AclBindingFilter> filters;
+
+        public Builder(List<AclBindingFilter> filters) {
+            super(DELETE_ACLS);
+            this.filters = filters;
+        }
+
+        @Override
+        public DeleteAclsRequest build(short version) {
+            return new DeleteAclsRequest(version, filters);
+        }
+
+        @Override
+        public String toString() {
+            return "(type=DeleteAclsRequest, filters=" + Utils.join(filters, ", ") + ")";
+        }
+    }
+
+    private final List<AclBindingFilter> filters;
+
+    DeleteAclsRequest(short version, List<AclBindingFilter> filters) {
+        super(version);
+        this.filters = filters;
+    }
+
+    public DeleteAclsRequest(Struct struct, short version) {
+        super(version);
+        this.filters = new ArrayList<>();
+        for (Object filterStructObj : struct.getArray(FILTERS)) {
+            Struct filterStruct = (Struct) filterStructObj;
+            ResourceFilter resourceFilter = RequestUtils.resourceFilterFromStructFields(filterStruct);
+            AccessControlEntryFilter aceFilter = RequestUtils.aceFilterFromStructFields(filterStruct);
+            this.filters.add(new AclBindingFilter(resourceFilter, aceFilter));
+        }
+    }
+
+    public List<AclBindingFilter> filters() {
+        return filters;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(DELETE_ACLS.requestSchema(version()));
+        List<Struct> filterStructs = new ArrayList<>();
+        for (AclBindingFilter filter : filters) {
+            Struct filterStruct = struct.instance(FILTERS);
+            RequestUtils.resourceFilterSetStructFields(filter.resourceFilter(), filterStruct);
+            RequestUtils.aceFilterSetStructFields(filter.entryFilter(), filterStruct);
+            filterStructs.add(filterStruct);
+        }
+        struct.set(FILTERS, filterStructs.toArray());
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) {
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
+                for (int i = 0; i < filters.size(); i++) {
+                    responses.add(new DeleteAclsResponse.AclFilterResponse(
+                        throwable, Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
+                }
+                return new DeleteAclsResponse(throttleTimeMs, responses);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_ACLS.latestVersion()));
+        }
+    }
+
+    public static DeleteAclsRequest parse(ByteBuffer buffer, short version) {
+        return new DeleteAclsRequest(DELETE_ACLS.parseRequest(version, buffer), version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
new file mode 100644
index 0000000..6fffc0f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.clients.admin.AccessControlEntry;
+import org.apache.kafka.clients.admin.AclBinding;
+import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class DeleteAclsResponse extends AbstractResponse {
+    public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class);
+    private final static String THROTTLE_TIME_MS = "throttle_time_ms";
+    private final static String FILTER_RESPONSES = "filter_responses";
+    private final static String ERROR_CODE = "error_code";
+    private final static String ERROR_MESSAGE = "error_message";
+    private final static String MATCHING_ACLS = "matching_acls";
+
+    public static class AclDeletionResult {
+        private final ApiException exception;
+        private final AclBinding acl;
+
+        public AclDeletionResult(ApiException exception, AclBinding acl) {
+            this.exception = exception;
+            this.acl = acl;
+        }
+
+        public ApiException exception() {
+            return exception;
+        }
+
+        public AclBinding acl() {
+            return acl;
+        }
+
+        @Override
+        public String toString() {
+            return "(apiException=" + exception + ", acl=" + acl + ")";
+        }
+    }
+
+    public static class AclFilterResponse {
+        private final Throwable throwable;
+        private final Collection<AclDeletionResult> deletions;
+
+        public AclFilterResponse(Throwable throwable, Collection<AclDeletionResult> deletions) {
+            this.throwable = throwable;
+            this.deletions = deletions;
+        }
+
+        public Throwable throwable() {
+            return throwable;
+        }
+
+        public Collection<AclDeletionResult> deletions() {
+            return deletions;
+        }
+
+        @Override
+        public String toString() {
+            return "(throwable=" + throwable + ", deletions=" + Utils.join(deletions, ",") + ")";
+        }
+    }
+
+    private final int throttleTimeMs;
+
+    private final List<AclFilterResponse> responses;
+
+    public DeleteAclsResponse(int throttleTimeMs, List<AclFilterResponse> responses) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.responses = responses;
+    }
+
+    public DeleteAclsResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+        this.responses = new ArrayList<>();
+        for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) {
+            Struct responseStruct = (Struct) responseStructObj;
+            short responseErrorCode = responseStruct.getShort(ERROR_CODE);
+            String responseErrorMessage = responseStruct.getString(ERROR_MESSAGE);
+            if (responseErrorCode != 0) {
+                this.responses.add(new AclFilterResponse(
+                    Errors.forCode(responseErrorCode).exception(responseErrorMessage),
+                    Collections.<AclDeletionResult>emptySet()));
+            } else {
+                List<AclDeletionResult> deletions = new ArrayList<>();
+                for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) {
+                    Struct matchingAclStruct = (Struct) matchingAclStructObj;
+                    short matchErrorCode = matchingAclStruct.getShort(ERROR_CODE);
+                    ApiException exception = null;
+                    if (matchErrorCode != 0) {
+                        Errors errors = Errors.forCode(matchErrorCode);
+                        String matchErrorMessage = matchingAclStruct.getString(ERROR_MESSAGE);
+                        exception = errors.exception(matchErrorMessage);
+                    }
+                    AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct);
+                    Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct);
+                    deletions.add(new AclDeletionResult(exception, new AclBinding(resource, entry)));
+                }
+                this.responses.add(new AclFilterResponse(null, deletions));
+            }
+        }
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.DELETE_ACLS.responseSchema(version));
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        List<Struct> responseStructs = new ArrayList<>();
+        for (AclFilterResponse response : responses) {
+            Struct responseStruct = struct.instance(FILTER_RESPONSES);
+            if (response.throwable() != null) {
+                Errors error = Errors.forException(response.throwable());
+                responseStruct.set(ERROR_CODE, error.code());
+                responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage());
+                responseStruct.set(MATCHING_ACLS, new Struct[0]);
+            } else {
+                responseStruct.set(ERROR_CODE, (short) 0);
+                List<Struct> deletionStructs = new ArrayList<>();
+                for (AclDeletionResult deletion : response.deletions()) {
+                    Struct deletionStruct = responseStruct.instance(MATCHING_ACLS);
+                    if (deletion.exception() != null) {
+                        Errors error = Errors.forException(deletion.exception);
+                        deletionStruct.set(ERROR_CODE, error.code());
+                        deletionStruct.set(ERROR_MESSAGE, deletion.exception.getMessage());
+                    } else {
+                        deletionStruct.set(ERROR_CODE, (short) 0);
+                    }
+                    RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct);
+                    RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct);
+                    deletionStructs.add(deletionStruct);
+                }
+                responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0]));
+            }
+            responseStructs.add(responseStruct);
+        }
+        struct.set(FILTER_RESPONSES, responseStructs.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public List<AclFilterResponse> responses() {
+        return responses;
+    }
+
+    public static DeleteAclsResponse parse(ByteBuffer buffer, short version) {
+        return new DeleteAclsResponse(ApiKeys.DELETE_ACLS.responseSchema(version).read(buffer));
+    }
+
+    public String toString() {
+        return "(responses=" + Utils.join(responses, ",") + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
new file mode 100644
index 0000000..8d4eba6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.clients.admin.AccessControlEntryFilter;
+import org.apache.kafka.clients.admin.AclBinding;
+import org.apache.kafka.clients.admin.AclBindingFilter;
+import org.apache.kafka.clients.admin.ResourceFilter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class DescribeAclsRequest extends AbstractRequest {
+    public static class Builder extends AbstractRequest.Builder<DescribeAclsRequest> {
+        private final AclBindingFilter filter;
+
+        public Builder(AclBindingFilter filter) {
+            super(ApiKeys.DESCRIBE_ACLS);
+            this.filter = filter;
+        }
+
+        @Override
+        public DescribeAclsRequest build(short version) {
+            return new DescribeAclsRequest(filter, version);
+        }
+
+        @Override
+        public String toString() {
+            return "(type=DescribeAclsRequest, filter=" + filter + ")";
+        }
+    }
+
+    private final AclBindingFilter filter;
+
+    DescribeAclsRequest(AclBindingFilter filter, short version) {
+        super(version);
+        this.filter = filter;
+    }
+
+    public DescribeAclsRequest(Struct struct, short version) {
+        super(version);
+        ResourceFilter resourceFilter = RequestUtils.resourceFilterFromStructFields(struct);
+        AccessControlEntryFilter entryFilter = RequestUtils.aceFilterFromStructFields(struct);
+        this.filter = new AclBindingFilter(resourceFilter, entryFilter);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.requestSchema(version()));
+        RequestUtils.resourceFilterSetStructFields(filter.resourceFilter(), struct);
+        RequestUtils.aceFilterSetStructFields(filter.entryFilter(), struct);
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) {
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new DescribeAclsResponse(throttleTimeMs, throwable, Collections.<AclBinding>emptySet());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_ACLS.latestVersion()));
+        }
+    }
+
+    public static DescribeAclsRequest parse(ByteBuffer buffer, short version) {
+        return new DescribeAclsRequest(ApiKeys.DESCRIBE_ACLS.parseRequest(version, buffer), version);
+    }
+
+    public AclBindingFilter filter() {
+        return filter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
new file mode 100644
index 0000000..0de4865
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.clients.admin.AccessControlEntry;
+import org.apache.kafka.clients.admin.AclBinding;
+import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeAclsResponse extends AbstractResponse {
+    private final static String THROTTLE_TIME_MS = "throttle_time_ms";
+    private final static String ERROR_CODE = "error_code";
+    private final static String ERROR_MESSAGE = "error_message";
+    private final static String RESOURCES = "resources";
+    private final static String ACLS = "acls";
+
+    private final int throttleTimeMs;
+    private final Throwable throwable;
+    private final Collection<AclBinding> acls;
+
+    public DescribeAclsResponse(int throttleTimeMs, Throwable throwable, Collection<AclBinding> acls) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.throwable = throwable;
+        this.acls = acls;
+    }
+
+    public DescribeAclsResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+        Errors error = Errors.forCode(struct.getShort(ERROR_CODE));
+        if (error != Errors.NONE) {
+            this.throwable = error.exception(struct.getString(ERROR_MESSAGE));
+            this.acls = Collections.emptySet();
+        } else {
+            this.throwable = null;
+            this.acls = new ArrayList<>();
+            for (Object resourceStructObj : struct.getArray(RESOURCES)) {
+                Struct resourceStruct = (Struct) resourceStructObj;
+                Resource resource = RequestUtils.resourceFromStructFields(resourceStruct);
+                for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) {
+                    Struct aclDataStruct = (Struct) aclDataStructObj;
+                    AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct);
+                    this.acls.add(new AclBinding(resource, entry));
+                }
+            }
+        }
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version));
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        if (throwable != null) {
+            Errors errors = Errors.forException(throwable);
+            struct.set(ERROR_CODE, errors.code());
+            struct.set(ERROR_MESSAGE, throwable.getMessage());
+            struct.set(RESOURCES, new Struct[0]);
+            return struct;
+        }
+        struct.set(ERROR_CODE, (short) 0);
+        struct.set(ERROR_MESSAGE, null);
+        Map<Resource, List<AccessControlEntry>> resourceToData = new HashMap<>();
+        for (AclBinding acl : acls) {
+            List<AccessControlEntry> entry = resourceToData.get(acl.resource());
+            if (entry == null) {
+                entry = new ArrayList<>();
+                resourceToData.put(acl.resource(), entry);
+            }
+            entry.add(acl.entry());
+        }
+        List<Struct> resourceStructs = new ArrayList<>();
+        for (Map.Entry<Resource, List<AccessControlEntry>> tuple : resourceToData.entrySet()) {
+            Resource resource = tuple.getKey();
+            Struct resourceStruct = struct.instance(RESOURCES);
+            RequestUtils.resourceSetStructFields(resource, resourceStruct);
+            List<Struct> dataStructs = new ArrayList<>();
+            for (AccessControlEntry entry : tuple.getValue()) {
+                Struct dataStruct = resourceStruct.instance(ACLS);
+                RequestUtils.aceSetStructFields(entry, dataStruct);
+                dataStructs.add(dataStruct);
+            }
+            resourceStruct.set(ACLS, dataStructs.toArray());
+            resourceStructs.add(resourceStruct);
+        }
+        struct.set(RESOURCES, resourceStructs.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Throwable throwable() {
+        return throwable;
+    }
+
+    public Collection<AclBinding> acls() {
+        return acls;
+    }
+
+    public static DescribeAclsResponse parse(ByteBuffer buffer, short version) {
+        return new DescribeAclsResponse(ApiKeys.DESCRIBE_ACLS.responseSchema(version).read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
new file mode 100644
index 0000000..f2ce55f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.clients.admin.AccessControlEntry;
+import org.apache.kafka.clients.admin.AccessControlEntryFilter;
+import org.apache.kafka.clients.admin.AclOperation;
+import org.apache.kafka.clients.admin.AclPermissionType;
+import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.clients.admin.ResourceFilter;
+import org.apache.kafka.clients.admin.ResourceType;
+import org.apache.kafka.common.protocol.types.Struct;
+
+class RequestUtils {
+    static Resource resourceFromStructFields(Struct struct) {
+        byte resourceType = struct.getByte("resource_type");
+        String name = struct.getString("resource_name");
+        return new Resource(ResourceType.fromCode(resourceType), name);
+    }
+
+    static void resourceSetStructFields(Resource resource, Struct struct) {
+        struct.set("resource_type", resource.resourceType().code());
+        struct.set("resource_name", resource.name());
+    }
+
+    static ResourceFilter resourceFilterFromStructFields(Struct struct) {
+        byte resourceType = struct.getByte("resource_type");
+        String name = struct.getString("resource_name");
+        return new ResourceFilter(ResourceType.fromCode(resourceType), name);
+    }
+
+    static void resourceFilterSetStructFields(ResourceFilter resourceFilter, Struct struct) {
+        struct.set("resource_type", resourceFilter.resourceType().code());
+        struct.set("resource_name", resourceFilter.name());
+    }
+
+    static AccessControlEntry aceFromStructFields(Struct struct) {
+        String principal = struct.getString("principal");
+        String host = struct.getString("host");
+        byte operation = struct.getByte("operation");
+        byte permissionType = struct.getByte("permission_type");
+        return new AccessControlEntry(principal, host, AclOperation.fromCode(operation),
+            AclPermissionType.fromCode(permissionType));
+    }
+
+    static void aceSetStructFields(AccessControlEntry data, Struct struct) {
+        struct.set("principal", data.principal());
+        struct.set("host", data.host());
+        struct.set("operation", data.operation().code());
+        struct.set("permission_type", data.permissionType().code());
+    }
+
+    static AccessControlEntryFilter aceFilterFromStructFields(Struct struct) {
+        String principal = struct.getString("principal");
+        String host = struct.getString("host");
+        byte operation = struct.getByte("operation");
+        byte permissionType = struct.getByte("permission_type");
+        return new AccessControlEntryFilter(principal, host, AclOperation.fromCode(operation),
+            AclPermissionType.fromCode(permissionType));
+    }
+
+    static void aceFilterSetStructFields(AccessControlEntryFilter filter, Struct struct) {
+        struct.set("principal", filter.principal());
+        struct.set("host", filter.host());
+        struct.set("operation", filter.operation().code());
+        struct.set("permission_type", filter.permissionType().code());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java
new file mode 100644
index 0000000..34cedb6
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AclBindingTest {
+    private static final AclBinding ACL1 = new AclBinding(
+        new Resource(ResourceType.TOPIC, "mytopic"),
+        new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
+
+    private static final AclBinding ACL2 = new AclBinding(
+        new Resource(ResourceType.TOPIC, "mytopic"),
+        new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW));
+
+    private static final AclBinding ACL3 = new AclBinding(
+        new Resource(ResourceType.TOPIC, "mytopic2"),
+        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
+
+    private static final AclBinding UNKNOWN_ACL = new AclBinding(
+        new Resource(ResourceType.TOPIC, "mytopic2"),
+        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY));
+
+    private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter(
+        new ResourceFilter(ResourceType.ANY, null),
+        new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY));
+
+    private static final AclBindingFilter ANY_DENY = new AclBindingFilter(
+        new ResourceFilter(ResourceType.ANY, null),
+        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY));
+
+    private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter(
+        new ResourceFilter(ResourceType.TOPIC, "mytopic"),
+        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
+
+    @Test
+    public void testMatching() throws Exception {
+        assertTrue(ACL1.equals(ACL1));
+        final AclBinding acl1Copy = new AclBinding(
+            new Resource(ResourceType.TOPIC, "mytopic"),
+            new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
+        assertTrue(ACL1.equals(acl1Copy));
+        assertTrue(acl1Copy.equals(ACL1));
+        assertTrue(ACL2.equals(ACL2));
+        assertFalse(ACL1.equals(ACL2));
+        assertFalse(ACL2.equals(ACL1));
+        assertTrue(AclBindingFilter.ANY.matches(ACL1));
+        assertFalse(AclBindingFilter.ANY.equals(ACL1));
+        assertTrue(AclBindingFilter.ANY.matches(ACL2));
+        assertFalse(AclBindingFilter.ANY.equals(ACL2));
+        assertTrue(AclBindingFilter.ANY.matches(ACL3));
+        assertFalse(AclBindingFilter.ANY.equals(ACL3));
+        assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY));
+        assertTrue(ANY_ANONYMOUS.matches(ACL1));
+        assertFalse(ANY_ANONYMOUS.equals(ACL1));
+        assertFalse(ANY_ANONYMOUS.matches(ACL2));
+        assertFalse(ANY_ANONYMOUS.equals(ACL2));
+        assertTrue(ANY_ANONYMOUS.matches(ACL3));
+        assertFalse(ANY_ANONYMOUS.equals(ACL3));
+        assertFalse(ANY_DENY.matches(ACL1));
+        assertFalse(ANY_DENY.matches(ACL2));
+        assertTrue(ANY_DENY.matches(ACL3));
+        assertTrue(ANY_MYTOPIC.matches(ACL1));
+        assertTrue(ANY_MYTOPIC.matches(ACL2));
+        assertFalse(ANY_MYTOPIC.matches(ACL3));
+        assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL));
+        assertTrue(ANY_DENY.matches(UNKNOWN_ACL));
+        assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL));
+        assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL));
+    }
+
+    @Test
+    public void testUnknowns() throws Exception {
+        assertFalse(ACL1.unknown());
+        assertFalse(ACL2.unknown());
+        assertFalse(ACL3.unknown());
+        assertFalse(ANY_ANONYMOUS.unknown());
+        assertFalse(ANY_DENY.unknown());
+        assertFalse(ANY_MYTOPIC.unknown());
+        assertTrue(UNKNOWN_ACL.unknown());
+    }
+
+    @Test
+    public void testMatchesAtMostOne() throws Exception {
+        assertEquals(null, ACL1.toFilter().findIndefiniteField());
+        assertEquals(null, ACL2.toFilter().findIndefiniteField());
+        assertEquals(null, ACL3.toFilter().findIndefiniteField());
+        assertFalse(ANY_ANONYMOUS.matchesAtMostOne());
+        assertFalse(ANY_DENY.matchesAtMostOne());
+        assertFalse(ANY_MYTOPIC.matchesAtMostOne());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
new file mode 100644
index 0000000..2d7c546
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AclOperationTest {
+    private static class AclOperationTestInfo {
+        private final AclOperation operation;
+        private final int code;
+        private final String name;
+        private final boolean unknown;
+
+        AclOperationTestInfo(AclOperation operation, int code, String name, boolean unknown) {
+            this.operation = operation;
+            this.code = code;
+            this.name = name;
+            this.unknown = unknown;
+        }
+    }
+
+    private static final AclOperationTestInfo[] INFOS = {
+        new AclOperationTestInfo(AclOperation.UNKNOWN, 0, "unknown", true),
+        new AclOperationTestInfo(AclOperation.ANY, 1, "any", false),
+        new AclOperationTestInfo(AclOperation.ALL, 2, "all", false),
+        new AclOperationTestInfo(AclOperation.READ, 3, "read", false),
+        new AclOperationTestInfo(AclOperation.WRITE, 4, "write", false),
+        new AclOperationTestInfo(AclOperation.CREATE, 5, "create", false),
+        new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false),
+        new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false),
+        new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false),
+        new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false)
+    };
+
+    @Test
+    public void testIsUnknown() throws Exception {
+        for (AclOperationTestInfo info : INFOS) {
+            assertEquals(info.operation + " was supposed to have unknown == " + info.unknown,
+                info.unknown, info.operation.unknown());
+        }
+    }
+
+    @Test
+    public void testCode() throws Exception {
+        for (AclOperationTestInfo info : INFOS) {
+            assertEquals(info.operation + " was supposed to have code == " + info.code,
+                info.code, info.operation.code());
+            assertEquals("AclOperation.fromCode(" + info.code + ") was supposed to be " +  info.operation,
+                info.operation, AclOperation.fromCode((byte) info.code));
+        }
+        assertEquals(AclOperation.UNKNOWN, AclOperation.fromCode((byte) 120));
+    }
+
+    @Test
+    public void testName() throws Exception {
+        for (AclOperationTestInfo info : INFOS) {
+            assertEquals("AclOperation.fromString(" + info.name + ") was supposed to be " +  info.operation,
+                info.operation, AclOperation.fromString(info.name));
+        }
+        assertEquals(AclOperation.UNKNOWN, AclOperation.fromString("something"));
+    }
+
+    @Test
+    public void testExhaustive() throws Exception {
+        assertEquals(INFOS.length, AclOperation.values().length);
+        for (int i = 0; i < INFOS.length; i++) {
+            assertEquals(INFOS[i].operation, AclOperation.values()[i]);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java
new file mode 100644
index 0000000..aa6deca
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AclPermissionTypeTest {
+    private static class AclPermissionTypeTestInfo {
+        private final AclPermissionType ty;
+        private final int code;
+        private final String name;
+        private final boolean unknown;
+
+        AclPermissionTypeTestInfo(AclPermissionType ty, int code, String name, boolean unknown) {
+            this.ty = ty;
+            this.code = code;
+            this.name = name;
+            this.unknown = unknown;
+        }
+    }
+
+    private static final AclPermissionTypeTestInfo[] INFOS = {
+        new AclPermissionTypeTestInfo(AclPermissionType.UNKNOWN, 0, "unknown", true),
+        new AclPermissionTypeTestInfo(AclPermissionType.ANY, 1, "any", false),
+        new AclPermissionTypeTestInfo(AclPermissionType.DENY, 2, "deny", false),
+        new AclPermissionTypeTestInfo(AclPermissionType.ALLOW, 3, "allow", false)
+    };
+
+    @Test
+    public void testIsUnknown() throws Exception {
+        for (AclPermissionTypeTestInfo info : INFOS) {
+            assertEquals(info.ty + " was supposed to have unknown == " + info.unknown,
+                info.unknown, info.ty.unknown());
+        }
+    }
+
+    @Test
+    public void testCode() throws Exception {
+        for (AclPermissionTypeTestInfo info : INFOS) {
+            assertEquals(info.ty + " was supposed to have code == " + info.code,
+                info.code, info.ty.code());
+            assertEquals("AclPermissionType.fromCode(" + info.code + ") was supposed to be " +  info.ty,
+                info.ty, AclPermissionType.fromCode((byte) info.code));
+        }
+        assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromCode((byte) 120));
+    }
+
+    @Test
+    public void testName() throws Exception {
+        for (AclPermissionTypeTestInfo info : INFOS) {
+            assertEquals("AclPermissionType.fromString(" + info.name + ") was supposed to be " +  info.ty,
+                info.ty, AclPermissionType.fromString(info.name));
+        }
+        assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromString("something"));
+    }
+
+    @Test
+    public void testExhaustive() throws Exception {
+        assertEquals(INFOS.length, AclPermissionType.values().length);
+        for (int i = 0; i < INFOS.length; i++) {
+            assertEquals(INFOS[i].ty, AclPermissionType.values()[i]);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 36cb8e8..6d01b0a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -19,17 +19,27 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateTopicsResponse.Error;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
+import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
+import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.utils.Time;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -203,4 +213,157 @@ public class KafkaAdminClientTest {
             future.get();
         }
     }
+
+    private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
+    private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),
+        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY));
+    private static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null),
+        new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY));
+    private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null),
+        new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY));
+
+    @Test
+    public void testDescribeAcls() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet());
+            ctx.mockClient.setNode(ctx.nodes.get(0));
+
+            // Test a call where we get back ACL1 and ACL2.
+            ctx.mockClient.prepareResponse(new DescribeAclsResponse(0, null,
+                new ArrayList<AclBinding>() {{
+                        add(ACL1);
+                        add(ACL2);
+                    }}));
+            assertCollectionIs(ctx.client.describeAcls(FILTER1).all().get(), ACL1, ACL2);
+
+            // Test a call where we get back no results.
+            ctx.mockClient.prepareResponse(new DescribeAclsResponse(0, null,
+                Collections.<AclBinding>emptySet()));
+            assertTrue(ctx.client.describeAcls(FILTER2).all().get().isEmpty());
+
+            // Test a call where we get back an error.
+            ctx.mockClient.prepareResponse(new DescribeAclsResponse(0,
+                new SecurityDisabledException("Security is disabled"), Collections.<AclBinding>emptySet()));
+            assertFutureError(ctx.client.describeAcls(FILTER2).all(), SecurityDisabledException.class);
+        }
+    }
+
+    @Test
+    public void testCreateAcls() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet());
+            ctx.mockClient.setNode(ctx.nodes.get(0));
+
+            // Test a call where we successfully create two ACLs.
+            ctx.mockClient.prepareResponse(new CreateAclsResponse(0,
+                new ArrayList<AclCreationResponse>() {{
+                        add(new AclCreationResponse(null));
+                        add(new AclCreationResponse(null));
+                    }}));
+            CreateAclsResults results = ctx.client.createAcls(new ArrayList<AclBinding>() {{
+                        add(ACL1);
+                        add(ACL2);
+                    }});
+            assertCollectionIs(results.results().keySet(), ACL1, ACL2);
+            for (KafkaFuture<Void> future : results.results().values()) {
+                future.get();
+            }
+            results.all().get();
+
+            // Test a call where we fail to create one ACL.
+            ctx.mockClient.prepareResponse(new CreateAclsResponse(0,
+                    new ArrayList<AclCreationResponse>() {{
+                        add(new AclCreationResponse(new SecurityDisabledException("Security is disabled")));
+                        add(new AclCreationResponse(null));
+                    }}));
+            results = ctx.client.createAcls(new ArrayList<AclBinding>() {{
+                    add(ACL1);
+                    add(ACL2);
+                }});
+            assertCollectionIs(results.results().keySet(), ACL1, ACL2);
+            assertFutureError(results.results().get(ACL1), SecurityDisabledException.class);
+            results.results().get(ACL2).get();
+            assertFutureError(results.all(), SecurityDisabledException.class);
+        }
+    }
+
+    @Test
+    public void testDeleteAcls() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet());
+            ctx.mockClient.setNode(ctx.nodes.get(0));
+
+            // Test a call where one filter has an error.
+            ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
+                    add(new AclFilterResponse(null,
+                            new ArrayList<AclDeletionResult>() {{
+                                add(new AclDeletionResult(null, ACL1));
+                                add(new AclDeletionResult(null, ACL2));
+                            }}));
+                    add(new AclFilterResponse(new SecurityDisabledException("No security"),
+                        Collections.<AclDeletionResult>emptySet()));
+                }}));
+            DeleteAclsResults results = ctx.client.deleteAcls(new ArrayList<AclBindingFilter>() {{
+                        add(FILTER1);
+                        add(FILTER2);
+                    }});
+            Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.results();
+            FilterResults filter1Results = filterResults.get(FILTER1).get();
+            assertEquals(null, filter1Results.acls().get(0).exception());
+            assertEquals(ACL1, filter1Results.acls().get(0).acl());
+            assertEquals(null, filter1Results.acls().get(1).exception());
+            assertEquals(ACL2, filter1Results.acls().get(1).acl());
+            assertTrue(filterResults.get(FILTER2).isCompletedExceptionally());
+            assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
+            assertFutureError(results.all(), SecurityDisabledException.class);
+
+            // Test a call where one deletion result has an error.
+            ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
+                    add(new AclFilterResponse(null,
+                        new ArrayList<AclDeletionResult>() {{
+                                add(new AclDeletionResult(null, ACL1));
+                                add(new AclDeletionResult(new SecurityDisabledException("No security"), ACL2));
+                            }}));
+                    add(new AclFilterResponse(null, Collections.<AclDeletionResult>emptySet()));
+                }}));
+            results = ctx.client.deleteAcls(
+                    new ArrayList<AclBindingFilter>() {{
+                            add(FILTER1);
+                            add(FILTER2);
+                        }});
+            assertTrue(results.results().get(FILTER2).get().acls().isEmpty());
+            assertFutureError(results.all(), SecurityDisabledException.class);
+
+            // Test a call where there are no errors.
+            ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
+                    add(new AclFilterResponse(null,
+                        new ArrayList<AclDeletionResult>() {{
+                                add(new AclDeletionResult(null, ACL1));
+                            }}));
+                    add(new AclFilterResponse(null,
+                        new ArrayList<AclDeletionResult>() {{
+                                add(new AclDeletionResult(null, ACL2));
+                            }}));
+                }}));
+            results = ctx.client.deleteAcls(
+                    new ArrayList<AclBindingFilter>() {{
+                        add(FILTER1);
+                        add(FILTER2);
+                    }});
+            Collection<AclBinding> deleted = results.all().get();
+            assertCollectionIs(deleted, ACL1, ACL2);
+        }
+    }
+
+    private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
+        for (T element : elements) {
+            assertTrue("Did not find " + element, collection.contains(element));
+        }
+        assertEquals("There are unexpected extra elements in the collection.",
+            elements.length, collection.size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
new file mode 100644
index 0000000..8f6f670
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ResourceTypeTest {
+    private static class AclResourceTypeTestInfo {
+        private final ResourceType resourceType;
+        private final int code;
+        private final String name;
+        private final boolean unknown;
+
+        AclResourceTypeTestInfo(ResourceType resourceType, int code, String name, boolean unknown) {
+            this.resourceType = resourceType;
+            this.code = code;
+            this.name = name;
+            this.unknown = unknown;
+        }
+    }
+
+    private static final AclResourceTypeTestInfo[] INFOS = {
+        new AclResourceTypeTestInfo(ResourceType.UNKNOWN, 0, "unknown", true),
+        new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false),
+        new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false),
+        new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false),
+        new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false)
+    };
+
+    @Test
+    public void testIsUnknown() throws Exception {
+        for (AclResourceTypeTestInfo info : INFOS) {
+            assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown,
+                info.unknown, info.resourceType.unknown());
+        }
+    }
+
+    @Test
+    public void testCode() throws Exception {
+        for (AclResourceTypeTestInfo info : INFOS) {
+            assertEquals(info.resourceType + " was supposed to have code == " + info.code,
+                info.code, info.resourceType.code());
+            assertEquals("AclResourceType.fromCode(" + info.code + ") was supposed to be " +
+                info.resourceType, info.resourceType, ResourceType.fromCode((byte) info.code));
+        }
+        assertEquals(ResourceType.UNKNOWN, ResourceType.fromCode((byte) 120));
+    }
+
+    @Test
+    public void testName() throws Exception {
+        for (AclResourceTypeTestInfo info : INFOS) {
+            assertEquals("ResourceType.fromString(" + info.name + ") was supposed to be " +
+                info.resourceType, info.resourceType, ResourceType.fromString(info.name));
+        }
+        assertEquals(ResourceType.UNKNOWN, ResourceType.fromString("something"));
+    }
+
+    @Test
+    public void testExhaustive() throws Exception {
+        assertEquals(INFOS.length, ResourceType.values().length);
+        for (int i = 0; i < INFOS.length; i++) {
+            assertEquals(INFOS[i].resourceType, ResourceType.values()[i]);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 4946246..ede55a5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,10 +16,21 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.clients.admin.AccessControlEntry;
+import org.apache.kafka.clients.admin.AccessControlEntryFilter;
+import org.apache.kafka.clients.admin.AclBinding;
+import org.apache.kafka.clients.admin.AclBindingFilter;
+import org.apache.kafka.clients.admin.AclOperation;
+import org.apache.kafka.clients.admin.AclPermissionType;
+import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.clients.admin.ResourceFilter;
+import org.apache.kafka.clients.admin.ResourceType;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.ListenerName;
@@ -35,6 +46,10 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
+import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
@@ -44,6 +59,7 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -205,6 +221,15 @@ public class RequestResponseTest {
         checkRequest(createTxnOffsetCommitRequest());
         checkErrorResponse(createTxnOffsetCommitRequest(), new UnknownServerException());
         checkResponse(createTxnOffsetCommitResponse(), 0);
+        checkRequest(createListAclsRequest());
+        checkErrorResponse(createListAclsRequest(), new SecurityDisabledException("Security is not enabled."));
+        checkResponse(createListAclsResponse(), ApiKeys.DESCRIBE_ACLS.latestVersion());
+        checkRequest(createCreateAclsRequest());
+        checkErrorResponse(createCreateAclsRequest(), new SecurityDisabledException("Security is not enabled."));
+        checkResponse(createCreateAclsResponse(), ApiKeys.CREATE_ACLS.latestVersion());
+        checkRequest(createDeleteAclsRequest());
+        checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled."));
+        checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion());
     }
 
     @Test
@@ -960,6 +985,61 @@ public class RequestResponseTest {
         return new TxnOffsetCommitResponse(0, errorPerPartitions);
     }
 
+    private DescribeAclsRequest createListAclsRequest() {
+        return new DescribeAclsRequest.Builder(new AclBindingFilter(
+                new ResourceFilter(ResourceType.TOPIC, "mytopic"),
+                new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).build();
+    }
+
+    private DescribeAclsResponse createListAclsResponse() {
+        return new DescribeAclsResponse(0, null, Collections.singleton(new AclBinding(
+            new Resource(ResourceType.TOPIC, "mytopic"),
+            new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))));
+    }
+
+    private CreateAclsRequest createCreateAclsRequest() {
+        List<AclCreation> creations = new ArrayList<>();
+        creations.add(new AclCreation(new AclBinding(
+            new Resource(ResourceType.TOPIC, "mytopic"),
+            new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.ALLOW))));
+        creations.add(new AclCreation(new AclBinding(
+            new Resource(ResourceType.GROUP, "mygroup"),
+            new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))));
+        return new CreateAclsRequest.Builder(creations).build();
+    }
+
+    private CreateAclsResponse createCreateAclsResponse() {
+        return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(null),
+            new AclCreationResponse(new InvalidRequestException("Foo bar"))));
+    }
+
+    private DeleteAclsRequest createDeleteAclsRequest() {
+        List<AclBindingFilter> filters = new ArrayList<>();
+        filters.add(new AclBindingFilter(
+            new ResourceFilter(ResourceType.ANY, null),
+            new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)));
+        filters.add(new AclBindingFilter(
+            new ResourceFilter(ResourceType.ANY, null),
+            new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY)));
+        return new DeleteAclsRequest.Builder(filters).build();
+    }
+
+    private DeleteAclsResponse createDeleteAclsResponse() {
+        List<AclFilterResponse> responses = new ArrayList<>();
+        responses.add(new AclFilterResponse(null,
+            new HashSet<AclDeletionResult>() {{
+                    add(new AclDeletionResult(null, new AclBinding(
+                        new Resource(ResourceType.TOPIC, "mytopic3"),
+                        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))));
+                    add(new AclDeletionResult(null, new AclBinding(
+                        new Resource(ResourceType.TOPIC, "mytopic4"),
+                        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY))));
+                }}));
+        responses.add(new AclFilterResponse(new SecurityDisabledException("No security"),
+            Collections.<AclDeletionResult>emptySet()));
+        return new DeleteAclsResponse(0, responses);
+    }
+
     private static class ByteBufferChannel implements GatheringByteChannel {
         private final ByteBuffer buf;
         private boolean closed = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
index 5d31c62..7d292d2 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -17,20 +17,50 @@
 package kafka.security.auth
 
 import kafka.common.{BaseEnum, KafkaException}
+import org.apache.kafka.clients.admin.AclOperation
+
+import scala.util.{Failure, Success, Try}
 
 /**
  * Different operations a client may perform on kafka resources.
  */
 
-sealed trait Operation extends BaseEnum
-case object Read extends Operation { val name = "Read" }
-case object Write extends Operation { val name = "Write" }
-case object Create extends Operation { val name = "Create" }
-case object Delete extends Operation { val name = "Delete" }
-case object Alter extends Operation { val name = "Alter" }
-case object Describe extends Operation { val name = "Describe" }
-case object ClusterAction extends Operation { val name = "ClusterAction" }
-case object All extends Operation { val name = "All" }
+sealed trait Operation extends BaseEnum {
+  def toJava : AclOperation
+}
+
+case object Read extends Operation {
+  val name = "Read"
+  val toJava = AclOperation.READ
+}
+case object Write extends Operation {
+  val name = "Write"
+  val toJava = AclOperation.WRITE
+}
+case object Create extends Operation {
+  val name = "Create"
+  val toJava = AclOperation.CREATE
+}
+case object Delete extends Operation {
+  val name = "Delete"
+  val toJava = AclOperation.DELETE
+}
+case object Alter extends Operation {
+  val name = "Alter"
+  val toJava = AclOperation.ALTER
+}
+case object Describe extends Operation {
+  val name = "Describe"
+  val toJava = AclOperation.DESCRIBE
+}
+case object ClusterAction extends Operation {
+  val name = "ClusterAction"
+  val toJava = AclOperation.CLUSTER_ACTION
+}
+case object All extends Operation {
+  val name = "All"
+  val toJava = AclOperation.ALL
+}
 
 object Operation {
    def fromString(operation: String): Operation = {
@@ -38,5 +68,13 @@ object Operation {
       op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
    }
 
+  def fromJava(operation: AclOperation): Try[Operation] = {
+    try {
+      Success(fromString(operation.toString))
+    } catch {
+      case throwable: Throwable => Failure(throwable)
+    }
+  }
+
    def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, All)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index fd2a0fe..c4209e5 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -17,20 +17,27 @@
 package kafka.security.auth
 
 import kafka.common.{BaseEnum, KafkaException}
+import org.apache.kafka.clients.admin.AclPermissionType
+
+import scala.util.{Failure, Success, Try}
 
 /**
  * PermissionType.
  */
 
 
-sealed trait PermissionType extends BaseEnum
+sealed trait PermissionType extends BaseEnum {
+  val toJava: AclPermissionType
+}
 
 case object Allow extends PermissionType {
   val name = "Allow"
+  val toJava = AclPermissionType.ALLOW
 }
 
 case object Deny extends PermissionType {
   val name = "Deny"
+  val toJava = AclPermissionType.DENY
 }
 
 object PermissionType {
@@ -39,6 +46,14 @@ object PermissionType {
     pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(",")))
   }
 
+  def fromJava(permissionType: AclPermissionType): Try[PermissionType] = {
+    try {
+      Success(fromString(permissionType.toString))
+    } catch {
+      case throwable: Throwable => Failure(throwable)
+    }
+  }
+
   def values: Seq[PermissionType] = List(Allow, Deny)
 }
 


Mime
View raw message