kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/3] kafka git commit: KAFKA-5265; Move ACLs, Config, Topic classes into org.apache.kafka.common
Date Wed, 31 May 2017 16:38:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 f925627e2 -> 1b64a4e63


http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 341021b..796e200 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -16,13 +16,13 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.AccessControlEntry;
-import org.apache.kafka.clients.admin.AclBinding;
-import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 8d4eba6..6573b6e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.AccessControlEntryFilter;
-import org.apache.kafka.clients.admin.AclBinding;
-import org.apache.kafka.clients.admin.AclBindingFilter;
-import org.apache.kafka.clients.admin.ResourceFilter;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.ResourceFilter;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index 127493b..cf21aa6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -17,12 +17,12 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.AccessControlEntry;
-import org.apache.kafka.clients.admin.AclBinding;
-import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.Resource;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index f2ce55f..fa23559 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -16,14 +16,14 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.AccessControlEntry;
-import org.apache.kafka.clients.admin.AccessControlEntryFilter;
-import org.apache.kafka.clients.admin.AclOperation;
-import org.apache.kafka.clients.admin.AclPermissionType;
-import org.apache.kafka.clients.admin.Resource;
-import org.apache.kafka.clients.admin.ResourceFilter;
-import org.apache.kafka.clients.admin.ResourceType;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceType;
 
 class RequestUtils {
     static Resource resourceFromStructFields(Struct struct) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
new file mode 100644
index 0000000..2883a03
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.resource;
+
+import java.util.Objects;
+
+/**
+ * Represents a cluster resource with a tuple of (type, name).
+ */
+public class Resource {
+    private final ResourceType resourceType;
+    private final String name;
+
+    public Resource(ResourceType resourceType, String name) {
+        Objects.requireNonNull(resourceType);
+        this.resourceType = resourceType;
+        Objects.requireNonNull(name);
+        this.name = name;
+    }
+
+    public ResourceType resourceType() {
+        return resourceType;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Create a filter which matches only this Resource.
+     */
+    public ResourceFilter toFilter() {
+        return new ResourceFilter(resourceType, name);
+    }
+
+    @Override
+    public String toString() {
+        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")";
+    }
+
+    /**
+     * Return true if this Resource has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return resourceType.unknown();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof Resource))
+            return false;
+        Resource other = (Resource) o;
+        return resourceType.equals(other.resourceType) && Objects.equals(name, other.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resourceType, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
new file mode 100644
index 0000000..572b7dc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.resource;
+
+import java.util.Objects;
+
+/**
+ * A filter which matches Resource objects.
+ */
+public class ResourceFilter {
+    private final ResourceType resourceType;
+    private final String name;
+
+    public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null);
+
+    public ResourceFilter(ResourceType resourceType, String name) {
+        Objects.requireNonNull(resourceType);
+        this.resourceType = resourceType;
+        this.name = name;
+    }
+
+    public ResourceType resourceType() {
+        return resourceType;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")";
+    }
+
+    /**
+     * Return true if this ResourceFilter has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return resourceType.unknown();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ResourceFilter))
+            return false;
+        ResourceFilter other = (ResourceFilter) o;
+        return resourceType.equals(other.resourceType) && Objects.equals(name, other.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resourceType, name);
+    }
+
+    public boolean matches(Resource other) {
+        if ((name != null) && (!name.equals(other.name())))
+            return false;
+        if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType())))
+            return false;
+        return true;
+    }
+
+    public boolean matchesAtMostOne() {
+        return findIndefiniteField() == null;
+    }
+
+    public String findIndefiniteField() {
+        if (resourceType == ResourceType.ANY)
+            return "Resource type is ANY.";
+        if (resourceType == ResourceType.UNKNOWN)
+            return "Resource type is UNKNOWN.";
+        if (name == null)
+            return "Resource name is NULL.";
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
new file mode 100644
index 0000000..a1b7b2b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.resource;
+
+import java.util.HashMap;
+import java.util.Locale;
+
+/**
+ * Represents a type of resource which an ACL can be applied to.
+ */
+public enum ResourceType {
+    /**
+     * Represents any ResourceType which this client cannot understand,
+     * perhaps because this client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any ResourceType.
+     */
+    ANY((byte) 1),
+
+    /**
+     * A Kafka topic.
+     */
+    TOPIC((byte) 2),
+
+    /**
+     * A consumer group.
+     */
+    GROUP((byte) 3),
+
+    /**
+     * The cluster as a whole.
+     */
+    CLUSTER((byte) 4),
+
+    /**
+     * A broker.
+     */
+    BROKER((byte) 5);
+
+    private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>();
+
+    static {
+        for (ResourceType resourceType : ResourceType.values()) {
+            CODE_TO_VALUE.put(resourceType.code, resourceType);
+        }
+    }
+
+    /**
+     * Parse the given string as an ACL resource type.
+     *
+     * @param str    The string to parse.
+     *
+     * @return       The ResourceType, or UNKNOWN if the string could not be matched.
+     */
+    public static ResourceType fromString(String str) throws IllegalArgumentException {
+        try {
+            return ResourceType.valueOf(str.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            return UNKNOWN;
+        }
+    }
+
+    public static ResourceType fromCode(byte code) {
+        ResourceType resourceType = CODE_TO_VALUE.get(code);
+        if (resourceType == null) {
+            return UNKNOWN;
+        }
+        return resourceType;
+    }
+
+    private final byte code;
+
+    ResourceType(byte code) {
+        this.code = code;
+    }
+
+    public byte code() {
+        return code;
+    }
+
+    public boolean unknown() {
+        return this == UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java
deleted file mode 100644
index 34cedb6..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class AclBindingTest {
-    private static final AclBinding ACL1 = new AclBinding(
-        new Resource(ResourceType.TOPIC, "mytopic"),
-        new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
-
-    private static final AclBinding ACL2 = new AclBinding(
-        new Resource(ResourceType.TOPIC, "mytopic"),
-        new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW));
-
-    private static final AclBinding ACL3 = new AclBinding(
-        new Resource(ResourceType.TOPIC, "mytopic2"),
-        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
-
-    private static final AclBinding UNKNOWN_ACL = new AclBinding(
-        new Resource(ResourceType.TOPIC, "mytopic2"),
-        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY));
-
-    private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
-        new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY));
-
-    private static final AclBindingFilter ANY_DENY = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
-        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY));
-
-    private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter(
-        new ResourceFilter(ResourceType.TOPIC, "mytopic"),
-        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
-
-    @Test
-    public void testMatching() throws Exception {
-        assertTrue(ACL1.equals(ACL1));
-        final AclBinding acl1Copy = new AclBinding(
-            new Resource(ResourceType.TOPIC, "mytopic"),
-            new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
-        assertTrue(ACL1.equals(acl1Copy));
-        assertTrue(acl1Copy.equals(ACL1));
-        assertTrue(ACL2.equals(ACL2));
-        assertFalse(ACL1.equals(ACL2));
-        assertFalse(ACL2.equals(ACL1));
-        assertTrue(AclBindingFilter.ANY.matches(ACL1));
-        assertFalse(AclBindingFilter.ANY.equals(ACL1));
-        assertTrue(AclBindingFilter.ANY.matches(ACL2));
-        assertFalse(AclBindingFilter.ANY.equals(ACL2));
-        assertTrue(AclBindingFilter.ANY.matches(ACL3));
-        assertFalse(AclBindingFilter.ANY.equals(ACL3));
-        assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY));
-        assertTrue(ANY_ANONYMOUS.matches(ACL1));
-        assertFalse(ANY_ANONYMOUS.equals(ACL1));
-        assertFalse(ANY_ANONYMOUS.matches(ACL2));
-        assertFalse(ANY_ANONYMOUS.equals(ACL2));
-        assertTrue(ANY_ANONYMOUS.matches(ACL3));
-        assertFalse(ANY_ANONYMOUS.equals(ACL3));
-        assertFalse(ANY_DENY.matches(ACL1));
-        assertFalse(ANY_DENY.matches(ACL2));
-        assertTrue(ANY_DENY.matches(ACL3));
-        assertTrue(ANY_MYTOPIC.matches(ACL1));
-        assertTrue(ANY_MYTOPIC.matches(ACL2));
-        assertFalse(ANY_MYTOPIC.matches(ACL3));
-        assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL));
-        assertTrue(ANY_DENY.matches(UNKNOWN_ACL));
-        assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL));
-        assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL));
-    }
-
-    @Test
-    public void testUnknowns() throws Exception {
-        assertFalse(ACL1.unknown());
-        assertFalse(ACL2.unknown());
-        assertFalse(ACL3.unknown());
-        assertFalse(ANY_ANONYMOUS.unknown());
-        assertFalse(ANY_DENY.unknown());
-        assertFalse(ANY_MYTOPIC.unknown());
-        assertTrue(UNKNOWN_ACL.unknown());
-    }
-
-    @Test
-    public void testMatchesAtMostOne() throws Exception {
-        assertEquals(null, ACL1.toFilter().findIndefiniteField());
-        assertEquals(null, ACL2.toFilter().findIndefiniteField());
-        assertEquals(null, ACL3.toFilter().findIndefiniteField());
-        assertFalse(ANY_ANONYMOUS.matchesAtMostOne());
-        assertFalse(ANY_DENY.matchesAtMostOne());
-        assertFalse(ANY_MYTOPIC.matchesAtMostOne());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
deleted file mode 100644
index 0e3441f..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class AclOperationTest {
-    private static class AclOperationTestInfo {
-        private final AclOperation operation;
-        private final int code;
-        private final String name;
-        private final boolean unknown;
-
-        AclOperationTestInfo(AclOperation operation, int code, String name, boolean unknown) {
-            this.operation = operation;
-            this.code = code;
-            this.name = name;
-            this.unknown = unknown;
-        }
-    }
-
-    private static final AclOperationTestInfo[] INFOS = {
-        new AclOperationTestInfo(AclOperation.UNKNOWN, 0, "unknown", true),
-        new AclOperationTestInfo(AclOperation.ANY, 1, "any", false),
-        new AclOperationTestInfo(AclOperation.ALL, 2, "all", false),
-        new AclOperationTestInfo(AclOperation.READ, 3, "read", false),
-        new AclOperationTestInfo(AclOperation.WRITE, 4, "write", false),
-        new AclOperationTestInfo(AclOperation.CREATE, 5, "create", false),
-        new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false),
-        new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false),
-        new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false),
-        new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false),
-        new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false),
-        new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false),
-        new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false)
-    };
-
-    @Test
-    public void testIsUnknown() throws Exception {
-        for (AclOperationTestInfo info : INFOS) {
-            assertEquals(info.operation + " was supposed to have unknown == " + info.unknown,
-                info.unknown, info.operation.unknown());
-        }
-    }
-
-    @Test
-    public void testCode() throws Exception {
-        for (AclOperationTestInfo info : INFOS) {
-            assertEquals(info.operation + " was supposed to have code == " + info.code,
-                info.code, info.operation.code());
-            assertEquals("AclOperation.fromCode(" + info.code + ") was supposed to be " +  info.operation,
-                info.operation, AclOperation.fromCode((byte) info.code));
-        }
-        assertEquals(AclOperation.UNKNOWN, AclOperation.fromCode((byte) 120));
-    }
-
-    @Test
-    public void testName() throws Exception {
-        for (AclOperationTestInfo info : INFOS) {
-            assertEquals("AclOperation.fromString(" + info.name + ") was supposed to be " +  info.operation,
-                info.operation, AclOperation.fromString(info.name));
-        }
-        assertEquals(AclOperation.UNKNOWN, AclOperation.fromString("something"));
-    }
-
-    @Test
-    public void testExhaustive() throws Exception {
-        assertEquals(INFOS.length, AclOperation.values().length);
-        for (int i = 0; i < INFOS.length; i++) {
-            assertEquals(INFOS[i].operation, AclOperation.values()[i]);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java
deleted file mode 100644
index aa6deca..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class AclPermissionTypeTest {
-    private static class AclPermissionTypeTestInfo {
-        private final AclPermissionType ty;
-        private final int code;
-        private final String name;
-        private final boolean unknown;
-
-        AclPermissionTypeTestInfo(AclPermissionType ty, int code, String name, boolean unknown) {
-            this.ty = ty;
-            this.code = code;
-            this.name = name;
-            this.unknown = unknown;
-        }
-    }
-
-    private static final AclPermissionTypeTestInfo[] INFOS = {
-        new AclPermissionTypeTestInfo(AclPermissionType.UNKNOWN, 0, "unknown", true),
-        new AclPermissionTypeTestInfo(AclPermissionType.ANY, 1, "any", false),
-        new AclPermissionTypeTestInfo(AclPermissionType.DENY, 2, "deny", false),
-        new AclPermissionTypeTestInfo(AclPermissionType.ALLOW, 3, "allow", false)
-    };
-
-    @Test
-    public void testIsUnknown() throws Exception {
-        for (AclPermissionTypeTestInfo info : INFOS) {
-            assertEquals(info.ty + " was supposed to have unknown == " + info.unknown,
-                info.unknown, info.ty.unknown());
-        }
-    }
-
-    @Test
-    public void testCode() throws Exception {
-        for (AclPermissionTypeTestInfo info : INFOS) {
-            assertEquals(info.ty + " was supposed to have code == " + info.code,
-                info.code, info.ty.code());
-            assertEquals("AclPermissionType.fromCode(" + info.code + ") was supposed to be " +  info.ty,
-                info.ty, AclPermissionType.fromCode((byte) info.code));
-        }
-        assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromCode((byte) 120));
-    }
-
-    @Test
-    public void testName() throws Exception {
-        for (AclPermissionTypeTestInfo info : INFOS) {
-            assertEquals("AclPermissionType.fromString(" + info.name + ") was supposed to be " +  info.ty,
-                info.ty, AclPermissionType.fromString(info.name));
-        }
-        assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromString("something"));
-    }
-
-    @Test
-    public void testExhaustive() throws Exception {
-        assertEquals(INFOS.length, AclPermissionType.values().length);
-        for (int i = 0; i < INFOS.length; i++) {
-            assertEquals(INFOS[i].ty, AclPermissionType.values()[i]);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 2ef654d..6f9e6af 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -17,11 +17,17 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults;
+import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
@@ -33,6 +39,9 @@ import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -234,7 +243,7 @@ public class KafkaAdminClientTest {
                         add(new AclCreationResponse(null));
                         add(new AclCreationResponse(null));
                     }}));
-            CreateAclsResults results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{
+            CreateAclsResult results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{
                         add(ACL1);
                         add(ACL2);
                     }});
@@ -278,7 +287,7 @@ public class KafkaAdminClientTest {
                     add(new AclFilterResponse(new SecurityDisabledException("No security"),
                         Collections.<AclDeletionResult>emptySet()));
                 }}));
-            DeleteAclsResults results = env.adminClient().deleteAcls(new ArrayList<AclBindingFilter>() {{
+            DeleteAclsResult results = env.adminClient().deleteAcls(new ArrayList<AclBindingFilter>() {{
                         add(FILTER1);
                         add(FILTER2);
                     }});

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
deleted file mode 100644
index af72de2..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class ResourceTypeTest {
-    private static class AclResourceTypeTestInfo {
-        private final ResourceType resourceType;
-        private final int code;
-        private final String name;
-        private final boolean unknown;
-
-        AclResourceTypeTestInfo(ResourceType resourceType, int code, String name, boolean unknown) {
-            this.resourceType = resourceType;
-            this.code = code;
-            this.name = name;
-            this.unknown = unknown;
-        }
-    }
-
-    private static final AclResourceTypeTestInfo[] INFOS = {
-        new AclResourceTypeTestInfo(ResourceType.UNKNOWN, 0, "unknown", true),
-        new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false),
-        new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false),
-        new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false),
-        new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false),
-        new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false)
-    };
-
-    @Test
-    public void testIsUnknown() throws Exception {
-        for (AclResourceTypeTestInfo info : INFOS) {
-            assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown,
-                info.unknown, info.resourceType.unknown());
-        }
-    }
-
-    @Test
-    public void testCode() throws Exception {
-        for (AclResourceTypeTestInfo info : INFOS) {
-            assertEquals(info.resourceType + " was supposed to have code == " + info.code,
-                info.code, info.resourceType.code());
-            assertEquals("AclResourceType.fromCode(" + info.code + ") was supposed to be " +
-                info.resourceType, info.resourceType, ResourceType.fromCode((byte) info.code));
-        }
-        assertEquals(ResourceType.UNKNOWN, ResourceType.fromCode((byte) 120));
-    }
-
-    @Test
-    public void testName() throws Exception {
-        for (AclResourceTypeTestInfo info : INFOS) {
-            assertEquals("ResourceType.fromString(" + info.name + ") was supposed to be " +
-                info.resourceType, info.resourceType, ResourceType.fromString(info.name));
-        }
-        assertEquals(ResourceType.UNKNOWN, ResourceType.fromString("something"));
-    }
-
-    @Test
-    public void testExhaustive() throws Exception {
-        assertEquals(INFOS.length, ResourceType.values().length);
-        for (int i = 0; i < INFOS.length; i++) {
-            assertEquals(INFOS[i].resourceType, ResourceType.values()[i]);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
new file mode 100644
index 0000000..e0a0598
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.acl;
+
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AclBindingTest {
+    private static final AclBinding ACL1 = new AclBinding(
+        new Resource(ResourceType.TOPIC, "mytopic"),
+        new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
+
+    private static final AclBinding ACL2 = new AclBinding(
+        new Resource(ResourceType.TOPIC, "mytopic"),
+        new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW));
+
+    private static final AclBinding ACL3 = new AclBinding(
+        new Resource(ResourceType.TOPIC, "mytopic2"),
+        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
+
+    private static final AclBinding UNKNOWN_ACL = new AclBinding(
+        new Resource(ResourceType.TOPIC, "mytopic2"),
+        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY));
+
+    private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter(
+        new ResourceFilter(ResourceType.ANY, null),
+        new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY));
+
+    private static final AclBindingFilter ANY_DENY = new AclBindingFilter(
+        new ResourceFilter(ResourceType.ANY, null),
+        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY));
+
+    private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter(
+        new ResourceFilter(ResourceType.TOPIC, "mytopic"),
+        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
+
+    @Test
+    public void testMatching() throws Exception {
+        assertTrue(ACL1.equals(ACL1));
+        final AclBinding acl1Copy = new AclBinding(
+            new Resource(ResourceType.TOPIC, "mytopic"),
+            new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
+        assertTrue(ACL1.equals(acl1Copy));
+        assertTrue(acl1Copy.equals(ACL1));
+        assertTrue(ACL2.equals(ACL2));
+        assertFalse(ACL1.equals(ACL2));
+        assertFalse(ACL2.equals(ACL1));
+        assertTrue(AclBindingFilter.ANY.matches(ACL1));
+        assertFalse(AclBindingFilter.ANY.equals(ACL1));
+        assertTrue(AclBindingFilter.ANY.matches(ACL2));
+        assertFalse(AclBindingFilter.ANY.equals(ACL2));
+        assertTrue(AclBindingFilter.ANY.matches(ACL3));
+        assertFalse(AclBindingFilter.ANY.equals(ACL3));
+        assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY));
+        assertTrue(ANY_ANONYMOUS.matches(ACL1));
+        assertFalse(ANY_ANONYMOUS.equals(ACL1));
+        assertFalse(ANY_ANONYMOUS.matches(ACL2));
+        assertFalse(ANY_ANONYMOUS.equals(ACL2));
+        assertTrue(ANY_ANONYMOUS.matches(ACL3));
+        assertFalse(ANY_ANONYMOUS.equals(ACL3));
+        assertFalse(ANY_DENY.matches(ACL1));
+        assertFalse(ANY_DENY.matches(ACL2));
+        assertTrue(ANY_DENY.matches(ACL3));
+        assertTrue(ANY_MYTOPIC.matches(ACL1));
+        assertTrue(ANY_MYTOPIC.matches(ACL2));
+        assertFalse(ANY_MYTOPIC.matches(ACL3));
+        assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL));
+        assertTrue(ANY_DENY.matches(UNKNOWN_ACL));
+        assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL));
+        assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL));
+    }
+
+    @Test
+    public void testUnknowns() throws Exception {
+        assertFalse(ACL1.unknown());
+        assertFalse(ACL2.unknown());
+        assertFalse(ACL3.unknown());
+        assertFalse(ANY_ANONYMOUS.unknown());
+        assertFalse(ANY_DENY.unknown());
+        assertFalse(ANY_MYTOPIC.unknown());
+        assertTrue(UNKNOWN_ACL.unknown());
+    }
+
+    @Test
+    public void testMatchesAtMostOne() throws Exception {
+        assertEquals(null, ACL1.toFilter().findIndefiniteField());
+        assertEquals(null, ACL2.toFilter().findIndefiniteField());
+        assertEquals(null, ACL3.toFilter().findIndefiniteField());
+        assertFalse(ANY_ANONYMOUS.matchesAtMostOne());
+        assertFalse(ANY_DENY.matchesAtMostOne());
+        assertFalse(ANY_MYTOPIC.matchesAtMostOne());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
new file mode 100644
index 0000000..5f5a87c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.acl;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AclOperationTest {
+    private static class AclOperationTestInfo {
+        private final AclOperation operation;
+        private final int code;
+        private final String name;
+        private final boolean unknown;
+
+        AclOperationTestInfo(AclOperation operation, int code, String name, boolean unknown) {
+            this.operation = operation;
+            this.code = code;
+            this.name = name;
+            this.unknown = unknown;
+        }
+    }
+
+    private static final AclOperationTestInfo[] INFOS = {
+        new AclOperationTestInfo(AclOperation.UNKNOWN, 0, "unknown", true),
+        new AclOperationTestInfo(AclOperation.ANY, 1, "any", false),
+        new AclOperationTestInfo(AclOperation.ALL, 2, "all", false),
+        new AclOperationTestInfo(AclOperation.READ, 3, "read", false),
+        new AclOperationTestInfo(AclOperation.WRITE, 4, "write", false),
+        new AclOperationTestInfo(AclOperation.CREATE, 5, "create", false),
+        new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false),
+        new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false),
+        new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false),
+        new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false),
+        new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false),
+        new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false),
+        new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false)
+    };
+
+    @Test
+    public void testIsUnknown() throws Exception {
+        for (AclOperationTestInfo info : INFOS) {
+            assertEquals(info.operation + " was supposed to have unknown == " + info.unknown,
+                info.unknown, info.operation.unknown());
+        }
+    }
+
+    @Test
+    public void testCode() throws Exception {
+        for (AclOperationTestInfo info : INFOS) {
+            assertEquals(info.operation + " was supposed to have code == " + info.code,
+                info.code, info.operation.code());
+            assertEquals("AclOperation.fromCode(" + info.code + ") was supposed to be " +  info.operation,
+                info.operation, AclOperation.fromCode((byte) info.code));
+        }
+        assertEquals(AclOperation.UNKNOWN, AclOperation.fromCode((byte) 120));
+    }
+
+    @Test
+    public void testName() throws Exception {
+        for (AclOperationTestInfo info : INFOS) {
+            assertEquals("AclOperation.fromString(" + info.name + ") was supposed to be " +  info.operation,
+                info.operation, AclOperation.fromString(info.name));
+        }
+        assertEquals(AclOperation.UNKNOWN, AclOperation.fromString("something"));
+    }
+
+    @Test
+    public void testExhaustive() throws Exception {
+        assertEquals(INFOS.length, AclOperation.values().length);
+        for (int i = 0; i < INFOS.length; i++) {
+            assertEquals(INFOS[i].operation, AclOperation.values()[i]);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
new file mode 100644
index 0000000..8e7fdc7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AclPermissionTypeTest {
+    private static class AclPermissionTypeTestInfo {
+        private final AclPermissionType ty;
+        private final int code;
+        private final String name;
+        private final boolean unknown;
+
+        AclPermissionTypeTestInfo(AclPermissionType ty, int code, String name, boolean unknown) {
+            this.ty = ty;
+            this.code = code;
+            this.name = name;
+            this.unknown = unknown;
+        }
+    }
+
+    private static final AclPermissionTypeTestInfo[] INFOS = {
+        new AclPermissionTypeTestInfo(AclPermissionType.UNKNOWN, 0, "unknown", true),
+        new AclPermissionTypeTestInfo(AclPermissionType.ANY, 1, "any", false),
+        new AclPermissionTypeTestInfo(AclPermissionType.DENY, 2, "deny", false),
+        new AclPermissionTypeTestInfo(AclPermissionType.ALLOW, 3, "allow", false)
+    };
+
+    @Test
+    public void testIsUnknown() throws Exception {
+        for (AclPermissionTypeTestInfo info : INFOS) {
+            assertEquals(info.ty + " was supposed to have unknown == " + info.unknown,
+                info.unknown, info.ty.unknown());
+        }
+    }
+
+    @Test
+    public void testCode() throws Exception {
+        for (AclPermissionTypeTestInfo info : INFOS) {
+            assertEquals(info.ty + " was supposed to have code == " + info.code,
+                info.code, info.ty.code());
+            assertEquals("AclPermissionType.fromCode(" + info.code + ") was supposed to be " +  info.ty,
+                info.ty, AclPermissionType.fromCode((byte) info.code));
+        }
+        assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromCode((byte) 120));
+    }
+
+    @Test
+    public void testName() throws Exception {
+        for (AclPermissionTypeTestInfo info : INFOS) {
+            assertEquals("AclPermissionType.fromString(" + info.name + ") was supposed to be " +  info.ty,
+                info.ty, AclPermissionType.fromString(info.name));
+        }
+        assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromString("something"));
+    }
+
+    @Test
+    public void testExhaustive() throws Exception {
+        assertEquals(INFOS.length, AclPermissionType.values().length);
+        for (int i = 0; i < INFOS.length; i++) {
+            assertEquals(INFOS[i].ty, AclPermissionType.values()[i]);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e0f48bf..56f0215 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,15 +16,12 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.AccessControlEntry;
-import org.apache.kafka.clients.admin.AccessControlEntryFilter;
-import org.apache.kafka.clients.admin.AclBinding;
-import org.apache.kafka.clients.admin.AclBindingFilter;
-import org.apache.kafka.clients.admin.AclOperation;
-import org.apache.kafka.clients.admin.AclPermissionType;
-import org.apache.kafka.clients.admin.Resource;
-import org.apache.kafka.clients.admin.ResourceFilter;
-import org.apache.kafka.clients.admin.ResourceType;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -50,6 +47,9 @@ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
new file mode 100644
index 0000000..4dc4cac
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.resource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ResourceTypeTest {
+    private static class AclResourceTypeTestInfo {
+        private final ResourceType resourceType;
+        private final int code;
+        private final String name;
+        private final boolean unknown;
+
+        AclResourceTypeTestInfo(ResourceType resourceType, int code, String name, boolean unknown) {
+            this.resourceType = resourceType;
+            this.code = code;
+            this.name = name;
+            this.unknown = unknown;
+        }
+    }
+
+    private static final AclResourceTypeTestInfo[] INFOS = {
+        new AclResourceTypeTestInfo(ResourceType.UNKNOWN, 0, "unknown", true),
+        new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false),
+        new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false),
+        new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false),
+        new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false),
+        new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false)
+    };
+
+    @Test
+    public void testIsUnknown() throws Exception {
+        for (AclResourceTypeTestInfo info : INFOS) {
+            assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown,
+                info.unknown, info.resourceType.unknown());
+        }
+    }
+
+    @Test
+    public void testCode() throws Exception {
+        for (AclResourceTypeTestInfo info : INFOS) {
+            assertEquals(info.resourceType + " was supposed to have code == " + info.code,
+                info.code, info.resourceType.code());
+            assertEquals("AclResourceType.fromCode(" + info.code + ") was supposed to be " +
+                info.resourceType, info.resourceType, ResourceType.fromCode((byte) info.code));
+        }
+        assertEquals(ResourceType.UNKNOWN, ResourceType.fromCode((byte) 120));
+    }
+
+    @Test
+    public void testName() throws Exception {
+        for (AclResourceTypeTestInfo info : INFOS) {
+            assertEquals("ResourceType.fromString(" + info.name + ") was supposed to be " +
+                info.resourceType, info.resourceType, ResourceType.fromString(info.name));
+        }
+        assertEquals(ResourceType.UNKNOWN, ResourceType.fromString("something"));
+    }
+
+    @Test
+    public void testExhaustive() throws Exception {
+        assertEquals(INFOS.length, ResourceType.values().length);
+        for (int i = 0; i < INFOS.length; i++) {
+            assertEquals(INFOS[i].resourceType, ResourceType.values()[i]);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 6a329d8..ad50aab 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -24,7 +24,7 @@ import kafka.api.ApiVersion
 import kafka.message.{BrokerCompressionCodec, Message}
 import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
 import org.apache.kafka.common.errors.InvalidConfigurationException
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
 
@@ -100,107 +100,66 @@ object LogConfig {
     println(configDef.toHtmlTable)
   }
 
-  val Delete = "delete"
-  val Compact = "compact"
+  val SegmentBytesProp = TopicConfig.SEGMENT_BYTES_CONFIG
+  val SegmentMsProp = TopicConfig.SEGMENT_MS_CONFIG
+  val SegmentJitterMsProp = TopicConfig.SEGMENT_JITTER_MS_CONFIG
+  val SegmentIndexBytesProp = TopicConfig.SEGMENT_INDEX_BYTES_CONFIG
+  val FlushMessagesProp = TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG
+  val FlushMsProp = TopicConfig.FLUSH_MS_CONFIG
+  val RetentionBytesProp = TopicConfig.RETENTION_BYTES_CONFIG
+  val RetentionMsProp = TopicConfig.RETENTION_MS_CONFIG
+  val MaxMessageBytesProp = TopicConfig.MAX_MESSAGE_BYTES_CONFIG
+  val IndexIntervalBytesProp = TopicConfig.INDEX_INTERVAL_BYTES_CONFIG
+  val DeleteRetentionMsProp = TopicConfig.DELETE_RETENTION_MS_CONFIG
+  val MinCompactionLagMsProp = TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG
+  val FileDeleteDelayMsProp = TopicConfig.FILE_DELETE_DELAY_MS_CONFIG
+  val MinCleanableDirtyRatioProp = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
+  val CleanupPolicyProp = TopicConfig.CLEANUP_POLICY_CONFIG
+  val Delete = TopicConfig.CLEANUP_POLICY_DELETE
+  val Compact = TopicConfig.CLEANUP_POLICY_COMPACT
+  val UncleanLeaderElectionEnableProp = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG
+  val MinInSyncReplicasProp = TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG
+  val CompressionTypeProp = TopicConfig.COMPRESSION_TYPE_CONFIG
+  val PreAllocateEnableProp = TopicConfig.PREALLOCATE_CONFIG
+  val MessageFormatVersionProp = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG
+  val MessageTimestampTypeProp = TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
+  val MessageTimestampDifferenceMaxMsProp = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG
 
-  val SegmentBytesProp = "segment.bytes"
-  val SegmentMsProp = "segment.ms"
-  val SegmentJitterMsProp = "segment.jitter.ms"
-  val SegmentIndexBytesProp = "segment.index.bytes"
-  val FlushMessagesProp = "flush.messages"
-  val FlushMsProp = "flush.ms"
-  val RetentionBytesProp = "retention.bytes"
-  val RetentionMsProp = "retention.ms"
-  val MaxMessageBytesProp = "max.message.bytes"
-  val IndexIntervalBytesProp = "index.interval.bytes"
-  val DeleteRetentionMsProp = "delete.retention.ms"
-  val MinCompactionLagMsProp = "min.compaction.lag.ms"
-  val FileDeleteDelayMsProp = "file.delete.delay.ms"
-  val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
-  val CleanupPolicyProp = "cleanup.policy"
-  val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
-  val MinInSyncReplicasProp = "min.insync.replicas"
-  val CompressionTypeProp = "compression.type"
-  val PreAllocateEnableProp = "preallocate"
-  val MessageFormatVersionProp = "message.format.version"
-  val MessageTimestampTypeProp = "message.timestamp.type"
-  val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms"
+  // Leave these out of TopicConfig for now as they are replication quota configs
   val LeaderReplicationThrottledReplicasProp = "leader.replication.throttled.replicas"
   val FollowerReplicationThrottledReplicasProp = "follower.replication.throttled.replicas"
 
-  val SegmentSizeDoc = "This configuration controls the segment file size for " +
-    "the log. Retention and cleaning is always done a file at a time so a larger " +
-    "segment size means fewer files but less granular control over retention."
-  val SegmentMsDoc = "This configuration controls the period of time after " +
-    "which Kafka will force the log to roll even if the segment file isn't full " +
-    "to ensure that retention can delete or compact old data."
-  val SegmentJitterMsDoc = "The maximum random jitter subtracted from the scheduled segment roll time to avoid" +
-    " thundering herds of segment rolling"
-  val FlushIntervalDoc = "This setting allows specifying an interval at which we " +
-    "will force an fsync of data written to the log. For example if this was set to 1 " +
-    "we would fsync after every message; if it were 5 we would fsync after every five " +
-    "messages. In general we recommend you not set this and use replication for " +
-    "durability and allow the operating system's background flush capabilities as it " +
-    "is more efficient. This setting can be overridden on a per-topic basis (see <a " +
-    "href=\"#topic-config\">the per-topic configuration section</a>)."
-  val FlushMsDoc = "This setting allows specifying a time interval at which we will " +
-    "force an fsync of data written to the log. For example if this was set to 1000 " +
-    "we would fsync after 1000 ms had passed. In general we recommend you not set " +
-    "this and use replication for durability and allow the operating system's background " +
-    "flush capabilities as it is more efficient."
-  val RetentionSizeDoc = "This configuration controls the maximum size a log can grow " +
-    "to before we will discard old log segments to free up space if we are using the " +
-    "\"delete\" retention policy. By default there is no size limit only a time limit."
-  val RetentionMsDoc = "This configuration controls the maximum time we will retain a " +
-    "log before we will discard old log segments to free up space if we are using the " +
-    "\"delete\" retention policy. This represents an SLA on how soon consumers must read " +
-    "their data."
-  val MaxIndexSizeDoc = "This configuration controls the size of the index that maps " +
-    "offsets to file positions. We preallocate this index file and shrink it only after log " +
-    "rolls. You generally should not need to change this setting."
-  val MaxMessageSizeDoc = "This is largest message size Kafka will allow to be appended. Note that if you increase" +
-    " this size you must also increase your consumer's fetch size so they can fetch messages this large."
-  val IndexIntervalDoc = "This setting controls how frequently Kafka adds an index " +
-    "entry to it's offset index. The default setting ensures that we index a message " +
-    "roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " +
-    "position in the log but makes the index larger. You probably don't need to change " +
-    "this."
-  val FileDeleteDelayMsDoc = "The time to wait before deleting a file from the filesystem"
-  val DeleteRetentionMsDoc = "The amount of time to retain delete tombstone markers " +
-    "for <a href=\"#compaction\">log compacted</a> topics. This setting also gives a bound " +
-    "on the time in which a consumer must complete a read if they begin from offset 0 " +
-    "to ensure that they get a valid snapshot of the final stage (otherwise delete " +
-    "tombstones may be collected before they complete their scan)."
-  val MinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. " +
-    "Only applicable for logs that are being compacted."
-  val MinCleanableRatioDoc = "This configuration controls how frequently the log " +
-    "compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " +
-    "compaction</a> is enabled). By default we will avoid cleaning a log where more than " +
-    "50% of the log has been compacted. This ratio bounds the maximum space wasted in " +
-    "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " +
-    "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
-    "space in the log."
-  val CompactDoc = "A string that is either \"delete\" or \"compact\". This string " +
-    "designates the retention policy to use on old log segments. The default policy " +
-    "(\"delete\") will discard old segments when their retention time or size limit has " +
-    "been reached. The \"compact\" setting will enable <a href=\"#compaction\">log " +
-    "compaction</a> on the topic."
-  val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as" +
-    " leader as a last resort, even though doing so may result in data loss"
-  val MinInSyncReplicasDoc = KafkaConfig.MinInSyncReplicasDoc
-  val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " +
-    "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
-    "no compression; and 'producer' which means retain the original compression codec set by the producer."
-  val PreAllocateEnableDoc ="Should pre allocate file when create new segment?"
-  val MessageFormatVersionDoc = KafkaConfig.LogMessageFormatVersionDoc
-  val MessageTimestampTypeDoc = KafkaConfig.LogMessageTimestampTypeDoc
-  val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
-    "a message and the timestamp specified in the message. If message.timestamp.type=CreateTime, a message will be rejected " +
-    "if the difference in timestamp exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."
-  val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on the leader side. The list should describe a set of " +
-    "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle all replicas for this topic."
-  val FollowerReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on the follower side. The list should describe a set of " +
-    "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle all replicas for this topic."
+  val SegmentSizeDoc = TopicConfig.SEGMENT_BYTES_DOC
+  val SegmentMsDoc = TopicConfig.SEGMENT_MS_DOC
+  val SegmentJitterMsDoc = TopicConfig.SEGMENT_JITTER_MS_DOC
+  val MaxIndexSizeDoc = TopicConfig.SEGMENT_INDEX_BYTES_DOC
+  val FlushIntervalDoc = TopicConfig.FLUSH_MESSAGES_INTERVAL_DOC
+  val FlushMsDoc = TopicConfig.FLUSH_MS_DOC
+  val RetentionSizeDoc = TopicConfig.RETENTION_BYTES_DOC
+  val RetentionMsDoc = TopicConfig.RETENTION_MS_DOC
+  val MaxMessageSizeDoc = TopicConfig.MAX_MESSAGE_BYTES_DOC
+  val IndexIntervalDoc = TopicConfig.INDEX_INTERVAL_BYTES_DOCS
+  val FileDeleteDelayMsDoc = TopicConfig.FILE_DELETE_DELAY_MS_DOC
+  val DeleteRetentionMsDoc = TopicConfig.DELETE_RETENTION_MS_DOC
+  val MinCompactionLagMsDoc = TopicConfig.MIN_COMPACTION_LAG_MS_DOC
+  val MinCleanableRatioDoc = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_DOC
+  val CompactDoc = TopicConfig.CLEANUP_POLICY_DOC
+  val UncleanLeaderElectionEnableDoc = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_DOC
+  val MinInSyncReplicasDoc = TopicConfig.MIN_IN_SYNC_REPLICAS_DOC
+  val CompressionTypeDoc = TopicConfig.COMPRESSION_TYPE_DOC
+  val PreAllocateEnableDoc = TopicConfig.PREALLOCATE_DOC
+  val MessageFormatVersionDoc = TopicConfig.MESSAGE_FORMAT_VERSION_DOC
+  val MessageTimestampTypeDoc = TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC
+  val MessageTimestampDifferenceMaxMsDoc = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC
+
+  val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on " +
+    "the leader side. The list should describe a set of replicas in the form " +
+    "[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle " +
+    "all replicas for this topic."
+  val FollowerReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on " +
+    "the follower side. The list should describe a set of " + "replicas in the form " +
+    "[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle " +
+    "all replicas for this topic."
 
   private class LogConfigDef extends ConfigDef {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
index 420c3eb..d3a25b5 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -17,7 +17,7 @@
 package kafka.security.auth
 
 import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.clients.admin.AclOperation
+import org.apache.kafka.common.acl.AclOperation
 
 import scala.util.{Failure, Success, Try}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index c4209e5..ec99ae4 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -17,7 +17,7 @@
 package kafka.security.auth
 
 import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.clients.admin.AclPermissionType
+import org.apache.kafka.common.acl.AclPermissionType
 
 import scala.util.{Failure, Success, Try}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d7f6773..b780823 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,7 +51,8 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.clients.admin.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 
 import scala.collection._
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index c52594b..065759f 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -28,10 +28,14 @@ import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.KafkaFuture
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException}
 import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.MetadataResponse
+import org.apache.kafka.common.resource.{Resource, ResourceType}
 import org.junit.rules.Timeout
 import org.junit.Assert._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index cb43b09..d27b0bf 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -18,8 +18,10 @@ import kafka.security.auth.SimpleAclAuthorizer
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, AdminClient, CreateAclsOptions, DeleteAclsOptions, Resource, ResourceFilter, ResourceType}
+import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
+import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType}
 import org.junit.Assert.assertEquals
 import org.junit.{After, Assert, Before, Test}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index fa2e55b..b261cb2 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,7 +24,8 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLayer}


Mime
View raw message