kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7005: Remove duplicate resource class. (#5184)
Date Mon, 11 Jun 2018 20:26:27 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 49db5a6  KAFKA-7005: Remove duplicate resource class. (#5184)
49db5a6 is described below

commit 49db5a63c043b50c10c2dfd0648f8d74ee917b6a
Author: Andy Coates <8012398+big-andy-coates@users.noreply.github.com>
AuthorDate: Mon Jun 11 21:26:22 2018 +0100

    KAFKA-7005: Remove duplicate resource class. (#5184)
    
    This is a follow-on change requested as part of the initial PR for KIP-290 #5117.  @cmccabe requested that the `resource.Resource` class be factored out in favour of `ConfigResource` to avoid confusion between all the `Resource` implementations.
    
    Colin Patrick McCabe <colin@cmccabe.xyz>, Jun Rao <junrao@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 43 +++++-----------
 .../apache/kafka/common/config/ConfigResource.java | 25 ++++++++-
 .../apache/kafka/common/protocol/types/Struct.java |  6 +++
 .../kafka/common/requests/AlterConfigsRequest.java | 35 +++++++------
 .../common/requests/AlterConfigsResponse.java      | 21 ++++----
 .../common/requests/DescribeConfigsRequest.java    | 40 ++++++++-------
 .../common/requests/DescribeConfigsResponse.java   | 36 +++++++------
 .../apache/kafka/common/requests/RequestUtils.java | 20 +++-----
 .../org/apache/kafka/common/requests/Resource.java | 60 ----------------------
 .../apache/kafka/common/requests/ResourceType.java | 42 ---------------
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 15 +++---
 .../kafka/common/config/ConfigResourceTest.java    | 45 ++++++++++++++++
 .../kafka/common/requests/RequestResponseTest.java | 37 ++++++-------
 .../main/scala/kafka/security/auth/Resource.scala  |  2 +-
 .../src/main/scala/kafka/server/AdminManager.scala | 14 ++---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 17 +++---
 .../kafka/api/AuthorizerIntegrationTest.scala      | 11 ++--
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  7 +--
 18 files changed, 215 insertions(+), 261 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 450de06..495095a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -112,8 +112,6 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
-import org.apache.kafka.common.requests.Resource;
-import org.apache.kafka.common.requests.ResourceType;
 import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -1683,19 +1681,19 @@ public class KafkaAdminClient extends AdminClient {
 
         // The BROKER resources which we want to describe.  We must make a separate DescribeConfigs
         // request for every BROKER resource we want to describe.
-        final Collection<Resource> brokerResources = new ArrayList<>();
+        final Collection<ConfigResource> brokerResources = new ArrayList<>();
 
         // The non-BROKER resources which we want to describe.  These resources can be described by a
         // single, unified DescribeConfigs request.
-        final Collection<Resource> unifiedRequestResources = new ArrayList<>(configResources.size());
+        final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(configResources.size());
 
         for (ConfigResource resource : configResources) {
             if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
-                brokerFutures.put(resource, new KafkaFutureImpl<Config>());
-                brokerResources.add(configResourceToResource(resource));
+                brokerFutures.put(resource, new KafkaFutureImpl<>());
+                brokerResources.add(resource);
             } else {
-                unifiedRequestFutures.put(resource, new KafkaFutureImpl<Config>());
-                unifiedRequestResources.add(configResourceToResource(resource));
+                unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
+                unifiedRequestResources.add(resource);
             }
         }
 
@@ -1716,7 +1714,7 @@ public class KafkaAdminClient extends AdminClient {
                     for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : unifiedRequestFutures.entrySet()) {
                         ConfigResource configResource = entry.getKey();
                         KafkaFutureImpl<Config> future = entry.getValue();
-                        DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
+                        DescribeConfigsResponse.Config config = response.config(configResource);
                         if (config == null) {
                             future.completeExceptionally(new UnknownServerException(
                                 "Malformed broker response: missing config for " + configResource));
@@ -1746,7 +1744,7 @@ public class KafkaAdminClient extends AdminClient {
 
         for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : brokerFutures.entrySet()) {
             final KafkaFutureImpl<Config> brokerFuture = entry.getValue();
-            final Resource resource = configResourceToResource(entry.getKey());
+            final ConfigResource resource = entry.getKey();
             final int nodeId = Integer.parseInt(resource.name());
             runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.timeoutMs()),
                     new ConstantNodeIdProvider(nodeId)) {
@@ -1792,21 +1790,6 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeConfigsResult(allFutures);
     }
 
-    private Resource configResourceToResource(ConfigResource configResource) {
-        ResourceType resourceType;
-        switch (configResource.type()) {
-            case TOPIC:
-                resourceType = ResourceType.TOPIC;
-                break;
-            case BROKER:
-                resourceType = ResourceType.BROKER;
-                break;
-            default:
-                throw new IllegalArgumentException("Unexpected resource type " + configResource.type());
-        }
-        return new Resource(resourceType, configResource.name());
-    }
-
     private List<ConfigEntry.ConfigSynonym> configSynonyms(DescribeConfigsResponse.ConfigEntry configEntry) {
         List<ConfigEntry.ConfigSynonym> synonyms = new ArrayList<>(configEntry.synonyms().size());
         for (DescribeConfigsResponse.ConfigSynonym synonym : configEntry.synonyms()) {
@@ -1856,7 +1839,7 @@ public class KafkaAdminClient extends AdminClient {
         }
         if (!unifiedRequestResources.isEmpty())
           allFutures.putAll(alterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
-        return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(allFutures));
+        return new AlterConfigsResult(new HashMap<>(allFutures));
     }
 
     private Map<ConfigResource, KafkaFutureImpl<Void>> alterConfigs(Map<ConfigResource, Config> configs,
@@ -1864,13 +1847,13 @@ public class KafkaAdminClient extends AdminClient {
                                                                     Collection<ConfigResource> resources,
                                                                     NodeProvider nodeProvider) {
         final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>();
-        final Map<Resource, AlterConfigsRequest.Config> requestMap = new HashMap<>(resources.size());
+        final Map<ConfigResource, AlterConfigsRequest.Config> requestMap = new HashMap<>(resources.size());
         for (ConfigResource resource : resources) {
             List<AlterConfigsRequest.ConfigEntry> configEntries = new ArrayList<>();
             for (ConfigEntry configEntry: configs.get(resource).entries())
                 configEntries.add(new AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value()));
-            requestMap.put(configResourceToResource(resource), new AlterConfigsRequest.Config(configEntries));
-            futures.put(resource, new KafkaFutureImpl<Void>());
+            requestMap.put(resource, new AlterConfigsRequest.Config(configEntries));
+            futures.put(resource, new KafkaFutureImpl<>());
         }
 
         final long now = time.milliseconds();
@@ -1886,7 +1869,7 @@ public class KafkaAdminClient extends AdminClient {
                 AlterConfigsResponse response = (AlterConfigsResponse) abstractResponse;
                 for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
                     KafkaFutureImpl<Void> future = entry.getValue();
-                    ApiException exception = response.errors().get(configResourceToResource(entry.getKey())).exception();
+                    ApiException exception = response.errors().get(entry.getKey()).exception();
                     if (exception != null) {
                         future.completeExceptionally(exception);
                     } else {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
index da718f5..d2ed4be 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -17,7 +17,12 @@
 
 package org.apache.kafka.common.config;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * A class representing resources that have configs.
@@ -28,7 +33,25 @@ public final class ConfigResource {
      * Type of resource.
      */
     public enum Type {
-        BROKER, TOPIC, UNKNOWN;
+        BROKER((byte) 3), TOPIC((byte) 2), UNKNOWN((byte) 0);
+
+        private static final Map<Byte, Type> TYPES = Collections.unmodifiableMap(
+            Arrays.stream(values()).collect(Collectors.toMap(Type::id, Function.identity()))
+        );
+
+        private final byte id;
+
+        Type(final byte id) {
+            this.id = id;
+        }
+
+        public byte id() {
+            return id;
+        }
+
+        public static Type forId(final byte id) {
+            return TYPES.getOrDefault(id, UNKNOWN);
+        }
     }
 
     private final Type type;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 7dccc10..7183aed 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -111,6 +111,12 @@ public class Struct {
         return alternative;
     }
 
+    public Byte getOrElse(Field.Int8 field, byte alternative) {
+        if (hasField(field.name))
+            return getByte(field.name);
+        return alternative;
+    }
+
     public Integer getOrElse(Field.Int32 field, int alternative) {
         if (hasField(field.name))
             return getInt(field.name);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index ed93b15..ff1d062 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -29,6 +30,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
 import static org.apache.kafka.common.protocol.types.Type.INT8;
@@ -73,7 +75,7 @@ public class AlterConfigsRequest extends AbstractRequest {
         private final Collection<ConfigEntry> entries;
 
         public Config(Collection<ConfigEntry> entries) {
-            this.entries = entries;
+            this.entries = Objects.requireNonNull(entries, "entries");
         }
 
         public Collection<ConfigEntry> entries() {
@@ -86,8 +88,8 @@ public class AlterConfigsRequest extends AbstractRequest {
         private final String value;
 
         public ConfigEntry(String name, String value) {
-            this.name = name;
-            this.value = value;
+            this.name = Objects.requireNonNull(name, "name");
+            this.value = Objects.requireNonNull(value, "value");
         }
 
         public String name() {
@@ -102,12 +104,12 @@ public class AlterConfigsRequest extends AbstractRequest {
 
     public static class Builder extends AbstractRequest.Builder {
 
-        private final Map<Resource, Config> configs;
+        private final Map<ConfigResource, Config> configs;
         private final boolean validateOnly;
 
-        public Builder(Map<Resource, Config> configs, boolean validateOnly) {
+        public Builder(Map<ConfigResource, Config> configs, boolean validateOnly) {
             super(ApiKeys.ALTER_CONFIGS);
-            this.configs = configs;
+            this.configs = Objects.requireNonNull(configs, "configs");
             this.validateOnly = validateOnly;
         }
 
@@ -117,12 +119,12 @@ public class AlterConfigsRequest extends AbstractRequest {
         }
     }
 
-    private final Map<Resource, Config> configs;
+    private final Map<ConfigResource, Config> configs;
     private final boolean validateOnly;
 
-    public AlterConfigsRequest(short version, Map<Resource, Config> configs, boolean validateOnly) {
+    public AlterConfigsRequest(short version, Map<ConfigResource, Config> configs, boolean validateOnly) {
         super(version);
-        this.configs = configs;
+        this.configs = Objects.requireNonNull(configs, "configs");
         this.validateOnly = validateOnly;
     }
 
@@ -134,9 +136,9 @@ public class AlterConfigsRequest extends AbstractRequest {
         for (Object resourcesObj : resourcesArray) {
             Struct resourcesStruct = (Struct) resourcesObj;
 
-            ResourceType resourceType = ResourceType.forId(resourcesStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            ConfigResource.Type resourceType = ConfigResource.Type.forId(resourcesStruct.getByte(RESOURCE_TYPE_KEY_NAME));
             String resourceName = resourcesStruct.getString(RESOURCE_NAME_KEY_NAME);
-            Resource resource = new Resource(resourceType, resourceName);
+            ConfigResource resource = new ConfigResource(resourceType, resourceName);
 
             Object[] configEntriesArray = resourcesStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
             List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length);
@@ -151,7 +153,7 @@ public class AlterConfigsRequest extends AbstractRequest {
         }
     }
 
-    public Map<Resource, Config> configs() {
+    public Map<ConfigResource, Config> configs() {
         return configs;
     }
 
@@ -164,10 +166,10 @@ public class AlterConfigsRequest extends AbstractRequest {
         Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.requestSchema(version()));
         struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
         List<Struct> resourceStructs = new ArrayList<>(configs.size());
-        for (Map.Entry<Resource, Config> entry : configs.entrySet()) {
+        for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
             Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
 
-            Resource resource = entry.getKey();
+            ConfigResource resource = entry.getKey();
             resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
             resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
 
@@ -194,8 +196,8 @@ public class AlterConfigsRequest extends AbstractRequest {
             case 0:
             case 1:
                 ApiError error = ApiError.fromThrowable(e);
-                Map<Resource, ApiError> errors = new HashMap<>(configs.size());
-                for (Resource resource : configs.keySet())
+                Map<ConfigResource, ApiError> errors = new HashMap<>(configs.size());
+                for (ConfigResource resource : configs.keySet())
                     errors.put(resource, error);
                 return new AlterConfigsResponse(throttleTimeMs, errors);
             default:
@@ -207,5 +209,4 @@ public class AlterConfigsRequest extends AbstractRequest {
     public static AlterConfigsRequest parse(ByteBuffer buffer, short version) {
         return new AlterConfigsRequest(ApiKeys.ALTER_CONFIGS.parseRequest(version, buffer), version);
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index feb694b..bf805df 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -29,6 +30,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -62,12 +64,11 @@ public class AlterConfigsResponse extends AbstractResponse {
     }
 
     private final int throttleTimeMs;
-    private final Map<Resource, ApiError> errors;
+    private final Map<ConfigResource, ApiError> errors;
 
-    public AlterConfigsResponse(int throttleTimeMs, Map<Resource, ApiError> errors) {
+    public AlterConfigsResponse(int throttleTimeMs, Map<ConfigResource, ApiError> errors) {
         this.throttleTimeMs = throttleTimeMs;
-        this.errors = errors;
-
+        this.errors = Objects.requireNonNull(errors, "errors");
     }
 
     public AlterConfigsResponse(Struct struct) {
@@ -77,13 +78,13 @@ public class AlterConfigsResponse extends AbstractResponse {
         for (Object resourceObj : resourcesArray) {
             Struct resourceStruct = (Struct) resourceObj;
             ApiError error = new ApiError(resourceStruct);
-            ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            ConfigResource.Type resourceType = ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
             String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
-            errors.put(new Resource(resourceType, resourceName), error);
+            errors.put(new ConfigResource(resourceType, resourceName), error);
         }
     }
 
-    public Map<Resource, ApiError> errors() {
+    public Map<ConfigResource, ApiError> errors() {
         return errors;
     }
 
@@ -102,9 +103,9 @@ public class AlterConfigsResponse extends AbstractResponse {
         Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> resourceStructs = new ArrayList<>(errors.size());
-        for (Map.Entry<Resource, ApiError> entry : errors.entrySet()) {
+        for (Map.Entry<ConfigResource, ApiError> entry : errors.entrySet()) {
             Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
-            Resource resource = entry.getKey();
+            ConfigResource resource = entry.getKey();
             entry.getValue().write(resourceStruct);
             resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
             resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 72bb112..781cd45 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -29,6 +30,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
 import static org.apache.kafka.common.protocol.types.Type.INT8;
@@ -64,12 +66,12 @@ public class DescribeConfigsRequest extends AbstractRequest {
     }
 
     public static class Builder extends AbstractRequest.Builder {
-        private final Map<Resource, Collection<String>> resourceToConfigNames;
+        private final Map<ConfigResource, Collection<String>> resourceToConfigNames;
         private boolean includeSynonyms;
 
-        public Builder(Map<Resource, Collection<String>> resourceToConfigNames) {
+        public Builder(Map<ConfigResource, Collection<String>> resourceToConfigNames) {
             super(ApiKeys.DESCRIBE_CONFIGS);
-            this.resourceToConfigNames = resourceToConfigNames;
+            this.resourceToConfigNames = Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames");
         }
 
         public Builder includeSynonyms(boolean includeSynonyms) {
@@ -77,13 +79,13 @@ public class DescribeConfigsRequest extends AbstractRequest {
             return this;
         }
 
-        public Builder(Collection<Resource> resources) {
+        public Builder(Collection<ConfigResource> resources) {
             this(toResourceToConfigNames(resources));
         }
 
-        private static Map<Resource, Collection<String>> toResourceToConfigNames(Collection<Resource> resources) {
-            Map<Resource, Collection<String>> result = new HashMap<>(resources.size());
-            for (Resource resource : resources)
+        private static Map<ConfigResource, Collection<String>> toResourceToConfigNames(Collection<ConfigResource> resources) {
+            Map<ConfigResource, Collection<String>> result = new HashMap<>(resources.size());
+            for (ConfigResource resource : resources)
                 result.put(resource, null);
             return result;
         }
@@ -94,12 +96,12 @@ public class DescribeConfigsRequest extends AbstractRequest {
         }
     }
 
-    private final Map<Resource, Collection<String>> resourceToConfigNames;
+    private final Map<ConfigResource, Collection<String>> resourceToConfigNames;
     private final boolean includeSynonyms;
 
-    public DescribeConfigsRequest(short version, Map<Resource, Collection<String>> resourceToConfigNames, boolean includeSynonyms) {
+    public DescribeConfigsRequest(short version, Map<ConfigResource, Collection<String>> resourceToConfigNames, boolean includeSynonyms) {
         super(version);
-        this.resourceToConfigNames = resourceToConfigNames;
+        this.resourceToConfigNames = Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames");
         this.includeSynonyms = includeSynonyms;
     }
 
@@ -109,7 +111,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
         resourceToConfigNames = new HashMap<>(resourcesArray.length);
         for (Object resourceObj : resourcesArray) {
             Struct resourceStruct = (Struct) resourceObj;
-            ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            ConfigResource.Type resourceType = ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
             String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
 
             Object[] configNamesArray = resourceStruct.getArray(CONFIG_NAMES_KEY_NAME);
@@ -120,19 +122,19 @@ public class DescribeConfigsRequest extends AbstractRequest {
                     configNames.add((String) configNameObj);
             }
 
-            resourceToConfigNames.put(new Resource(resourceType, resourceName), configNames);
+            resourceToConfigNames.put(new ConfigResource(resourceType, resourceName), configNames);
         }
         this.includeSynonyms = struct.hasField(INCLUDE_SYNONYMS) ? struct.getBoolean(INCLUDE_SYNONYMS) : false;
     }
 
-    public Collection<Resource> resources() {
+    public Collection<ConfigResource> resources() {
         return resourceToConfigNames.keySet();
     }
 
     /**
      * Return null if all config names should be returned.
      */
-    public Collection<String> configNames(Resource resource) {
+    public Collection<String> configNames(ConfigResource resource) {
         return resourceToConfigNames.get(resource);
     }
 
@@ -144,8 +146,8 @@ public class DescribeConfigsRequest extends AbstractRequest {
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version()));
         List<Struct> resourceStructs = new ArrayList<>(resources().size());
-        for (Map.Entry<Resource, Collection<String>> entry : resourceToConfigNames.entrySet()) {
-            Resource resource = entry.getKey();
+        for (Map.Entry<ConfigResource, Collection<String>> entry : resourceToConfigNames.entrySet()) {
+            ConfigResource resource = entry.getKey();
             Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
             resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
             resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
@@ -168,10 +170,10 @@ public class DescribeConfigsRequest extends AbstractRequest {
             case 1:
             case 2:
                 ApiError error = ApiError.fromThrowable(e);
-                Map<Resource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size());
+                Map<ConfigResource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size());
                 DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error,
-                        Collections.<DescribeConfigsResponse.ConfigEntry>emptyList());
-                for (Resource resource : resources())
+                        Collections.emptyList());
+                for (ConfigResource resource : resources())
                     errors.put(resource, config);
                 return new DescribeConfigsResponse(throttleTimeMs, errors);
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 9ae1b5e..51c35d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -31,6 +32,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -114,8 +116,8 @@ public class DescribeConfigsResponse extends AbstractResponse {
         private final Collection<ConfigEntry> entries;
 
         public Config(ApiError error, Collection<ConfigEntry> entries) {
-            this.error = error;
-            this.entries = entries;
+            this.error = Objects.requireNonNull(error, "error");
+            this.entries = Objects.requireNonNull(entries, "entries");
         }
 
         public ApiError error() {
@@ -138,12 +140,12 @@ public class DescribeConfigsResponse extends AbstractResponse {
         public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly,
                            Collection<ConfigSynonym> synonyms) {
 
-            this.name = name;
+            this.name = Objects.requireNonNull(name, "name");
             this.value = value;
-            this.source = source;
+            this.source = Objects.requireNonNull(source, "source");
             this.isSensitive = isSensitive;
             this.readOnly = readOnly;
-            this.synonyms = synonyms;
+            this.synonyms = Objects.requireNonNull(synonyms, "synonyms");
         }
 
         public String name() {
@@ -201,9 +203,9 @@ public class DescribeConfigsResponse extends AbstractResponse {
         private final ConfigSource source;
 
         public ConfigSynonym(String name, String value, ConfigSource source) {
-            this.name = name;
+            this.name = Objects.requireNonNull(name, "name");
             this.value = value;
-            this.source = source;
+            this.source = Objects.requireNonNull(source, "source");
         }
 
         public String name() {
@@ -219,11 +221,11 @@ public class DescribeConfigsResponse extends AbstractResponse {
 
 
     private final int throttleTimeMs;
-    private final Map<Resource, Config> configs;
+    private final Map<ConfigResource, Config> configs;
 
-    public DescribeConfigsResponse(int throttleTimeMs, Map<Resource, Config> configs) {
+    public DescribeConfigsResponse(int throttleTimeMs, Map<ConfigResource, Config> configs) {
         this.throttleTimeMs = throttleTimeMs;
-        this.configs = configs;
+        this.configs = Objects.requireNonNull(configs, "configs");
     }
 
     public DescribeConfigsResponse(Struct struct) {
@@ -234,9 +236,9 @@ public class DescribeConfigsResponse extends AbstractResponse {
             Struct resourceStruct = (Struct) resourceObj;
 
             ApiError error = new ApiError(resourceStruct);
-            ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            ConfigResource.Type resourceType = ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
             String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
-            Resource resource = new Resource(resourceType, resourceName);
+            ConfigResource resource = new ConfigResource(resourceType, resourceName);
 
             Object[] configEntriesArray = resourceStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
             List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length);
@@ -287,11 +289,11 @@ public class DescribeConfigsResponse extends AbstractResponse {
         }
     }
 
-    public Map<Resource, Config> configs() {
+    public Map<ConfigResource, Config> configs() {
         return configs;
     }
 
-    public Config config(Resource resource) {
+    public Config config(ConfigResource resource) {
         return configs.get(resource);
     }
 
@@ -313,10 +315,10 @@ public class DescribeConfigsResponse extends AbstractResponse {
         Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> resourceStructs = new ArrayList<>(configs.size());
-        for (Map.Entry<Resource, Config> entry : configs.entrySet()) {
+        for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
             Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
 
-            Resource resource = entry.getKey();
+            ConfigResource resource = entry.getKey();
             resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
             resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index 76b2707..275fb16 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -45,37 +45,29 @@ final class RequestUtils {
     static ResourcePattern resourcePatternromStructFields(Struct struct) {
         byte resourceType = struct.get(RESOURCE_TYPE);
         String name = struct.get(RESOURCE_NAME);
-        ResourceNameType resourceNameType = ResourceNameType.LITERAL;
-        if (struct.hasField(RESOURCE_NAME_TYPE)) {
-            resourceNameType = ResourceNameType.fromCode(struct.get(RESOURCE_NAME_TYPE));
-        }
+        ResourceNameType resourceNameType = ResourceNameType.fromCode(
+            struct.getOrElse(RESOURCE_NAME_TYPE, ResourceNameType.LITERAL.code()));
         return new ResourcePattern(ResourceType.fromCode(resourceType), name, resourceNameType);
     }
 
     static void resourcePatternSetStructFields(ResourcePattern pattern, Struct struct) {
         struct.set(RESOURCE_TYPE, pattern.resourceType().code());
         struct.set(RESOURCE_NAME, pattern.name());
-        if (struct.hasField(RESOURCE_NAME_TYPE)) {
-            struct.set(RESOURCE_NAME_TYPE, pattern.nameType().code());
-        }
+        struct.setIfExists(RESOURCE_NAME_TYPE, pattern.nameType().code());
     }
 
     static ResourcePatternFilter resourcePatternFilterFromStructFields(Struct struct) {
         byte resourceType = struct.get(RESOURCE_TYPE);
         String name = struct.get(RESOURCE_NAME_FILTER);
-        ResourceNameType resourceNameType = ResourceNameType.LITERAL;
-        if (struct.hasField(RESOURCE_NAME_TYPE_FILTER)) {
-            resourceNameType = ResourceNameType.fromCode(struct.get(RESOURCE_NAME_TYPE_FILTER));
-        }
+        ResourceNameType resourceNameType = ResourceNameType.fromCode(
+            struct.getOrElse(RESOURCE_NAME_TYPE_FILTER, ResourceNameType.LITERAL.code()));
         return new ResourcePatternFilter(ResourceType.fromCode(resourceType), name, resourceNameType);
     }
 
     static void resourcePatternFilterSetStructFields(ResourcePatternFilter patternFilter, Struct struct) {
         struct.set(RESOURCE_TYPE, patternFilter.resourceType().code());
         struct.set(RESOURCE_NAME_FILTER, patternFilter.name());
-        if (struct.hasField(RESOURCE_NAME_TYPE_FILTER)) {
-            struct.set(RESOURCE_NAME_TYPE_FILTER, patternFilter.nameType().code());
-        }
+        struct.setIfExists(RESOURCE_NAME_TYPE_FILTER, patternFilter.nameType().code());
     }
 
     static AccessControlEntry aceFromStructFields(Struct struct) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/Resource.java b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
deleted file mode 100644
index bd81466..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.requests;
-
-public final class Resource {
-    private final ResourceType type;
-    private final String name;
-
-    public Resource(ResourceType type, String name) {
-        this.type = type;
-        this.name = name;
-    }
-
-    public ResourceType type() {
-        return type;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        Resource resource = (Resource) o;
-
-        return type == resource.type && name.equals(resource.name);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = type.hashCode();
-        result = 31 * result + name.hashCode();
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "Resource(type=" + type + ", name='" + name + "')";
-    }
-}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java
deleted file mode 100644
index 2c11772..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.requests;
-
-public enum ResourceType {
-    UNKNOWN((byte) 0), ANY((byte) 1), TOPIC((byte) 2), GROUP((byte) 3), BROKER((byte) 4);
-
-    private static final ResourceType[] VALUES = values();
-
-    private final byte id;
-
-    ResourceType(byte id) {
-        this.id = id;
-    }
-
-    public byte id() {
-        return id;
-    }
-
-    public static ResourceType forId(byte id) {
-        if (id < 0)
-            throw new IllegalArgumentException("id should be positive, id: " + id);
-        if (id >= VALUES.length)
-            return UNKNOWN;
-        return VALUES[id];
-    }
-}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 5cb6bbc..d2f9887 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -34,8 +34,6 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.resource.ResourcePattern;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
@@ -68,6 +66,8 @@ import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -98,8 +98,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
-import static org.apache.kafka.common.requests.ResourceType.BROKER;
-import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -680,9 +678,9 @@ public class KafkaAdminClientTest {
             // The next request should succeed.
             time.sleep(5000);
             env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
-                Collections.singletonMap(new org.apache.kafka.common.requests.Resource(TOPIC, "foo"),
+                Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "foo"),
                     new DescribeConfigsResponse.Config(ApiError.NONE,
-                        Collections.<DescribeConfigsResponse.ConfigEntry>emptySet()))));
+                        Collections.emptySet()))));
             DescribeConfigsResult result2 = env.adminClient().describeConfigs(Collections.singleton(
                 new ConfigResource(ConfigResource.Type.TOPIC, "foo")));
             time.sleep(5000);
@@ -696,9 +694,8 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
-                Collections.singletonMap(new org.apache.kafka.common.requests.Resource(BROKER, "0"),
-                    new DescribeConfigsResponse.Config(ApiError.NONE,
-                        Collections.<DescribeConfigsResponse.ConfigEntry>emptySet()))));
+                Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, "0"),
+                    new DescribeConfigsResponse.Config(ApiError.NONE, Collections.emptySet()))));
             DescribeConfigsResult result2 = env.adminClient().describeConfigs(Collections.singleton(
                 new ConfigResource(ConfigResource.Type.BROKER, "0")));
             result2.all().get();
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java
new file mode 100644
index 0000000..6324f0e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConfigResourceTest {
+    @Test
+    public void shouldGetTypeFromId() {
+        assertEquals(ConfigResource.Type.TOPIC, ConfigResource.Type.forId((byte) 2));
+        assertEquals(ConfigResource.Type.BROKER, ConfigResource.Type.forId((byte) 3));
+    }
+
+    @Test
+    public void shouldReturnUnknownForUnknownCode() {
+        assertEquals(ConfigResource.Type.UNKNOWN, ConfigResource.Type.forId((byte) -1));
+        assertEquals(ConfigResource.Type.UNKNOWN, ConfigResource.Type.forId((byte) 0));
+        assertEquals(ConfigResource.Type.UNKNOWN, ConfigResource.Type.forId((byte) 1));
+    }
+
+    @Test
+    public void shouldRoundTripEveryType() {
+        Arrays.stream(ConfigResource.Type.values()).forEach(type ->
+            assertEquals(type.toString(), type, ConfigResource.Type.forId(type.id())));
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f537f48..e09cc9b 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -25,8 +25,7 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.resource.ResourcePattern;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
@@ -48,6 +47,8 @@ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -300,7 +301,7 @@ public class RequestResponseTest {
     }
 
     private void verifyDescribeConfigsResponse(DescribeConfigsResponse expected, DescribeConfigsResponse actual, int version) throws Exception {
-        for (org.apache.kafka.common.requests.Resource resource : expected.configs().keySet()) {
+        for (ConfigResource resource : expected.configs().keySet()) {
             Collection<DescribeConfigsResponse.ConfigEntry> deserializedEntries1 = actual.config(resource).entries();
             Iterator<DescribeConfigsResponse.ConfigEntry> expectedEntries = expected.config(resource).entries().iterator();
             for (DescribeConfigsResponse.ConfigEntry entry : deserializedEntries1) {
@@ -1140,21 +1141,21 @@ public class RequestResponseTest {
 
     private DescribeConfigsRequest createDescribeConfigsRequest(int version) {
         return new DescribeConfigsRequest.Builder(asList(
-                new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"),
-                new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic")))
+                new ConfigResource(ConfigResource.Type.BROKER, "0"),
+                new ConfigResource(ConfigResource.Type.TOPIC, "topic")))
                 .build((short) version);
     }
 
     private DescribeConfigsRequest createDescribeConfigsRequestWithConfigEntries(int version) {
-        Map<org.apache.kafka.common.requests.Resource, Collection<String>> resources = new HashMap<>();
-        resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), asList("foo", "bar"));
-        resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), null);
-        resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic a"), Collections.<String>emptyList());
+        Map<ConfigResource, Collection<String>> resources = new HashMap<>();
+        resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), asList("foo", "bar"));
+        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), null);
+        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic a"), Collections.<String>emptyList());
         return new DescribeConfigsRequest.Builder(resources).build((short) version);
     }
 
     private DescribeConfigsResponse createDescribeConfigsResponse() {
-        Map<org.apache.kafka.common.requests.Resource, DescribeConfigsResponse.Config> configs = new HashMap<>();
+        Map<ConfigResource, DescribeConfigsResponse.Config> configs = new HashMap<>();
         List<DescribeConfigsResponse.ConfigSynonym> synonyms = Collections.emptyList();
         List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
                 new DescribeConfigsResponse.ConfigEntry("config_name", "config_value",
@@ -1162,29 +1163,29 @@ public class RequestResponseTest {
                 new DescribeConfigsResponse.ConfigEntry("another_name", "another value",
                         DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms)
         );
-        configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new DescribeConfigsResponse.Config(
+        configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new DescribeConfigsResponse.Config(
                 ApiError.NONE, configEntries));
-        configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new DescribeConfigsResponse.Config(
+        configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), new DescribeConfigsResponse.Config(
                 ApiError.NONE, Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
         return new DescribeConfigsResponse(200, configs);
     }
 
     private AlterConfigsRequest createAlterConfigsRequest() {
-        Map<org.apache.kafka.common.requests.Resource, AlterConfigsRequest.Config> configs = new HashMap<>();
+        Map<ConfigResource, AlterConfigsRequest.Config> configs = new HashMap<>();
         List<AlterConfigsRequest.ConfigEntry> configEntries = asList(
                 new AlterConfigsRequest.ConfigEntry("config_name", "config_value"),
                 new AlterConfigsRequest.ConfigEntry("another_name", "another value")
         );
-        configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new AlterConfigsRequest.Config(configEntries));
-        configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"),
+        configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new AlterConfigsRequest.Config(configEntries));
+        configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
                 new AlterConfigsRequest.Config(Collections.<AlterConfigsRequest.ConfigEntry>emptyList()));
         return new AlterConfigsRequest((short) 0, configs, false);
     }
 
     private AlterConfigsResponse createAlterConfigsResponse() {
-        Map<org.apache.kafka.common.requests.Resource, ApiError> errors = new HashMap<>();
-        errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), ApiError.NONE);
-        errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
+        Map<ConfigResource, ApiError> errors = new HashMap<>();
+        errors.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), ApiError.NONE);
+        errors.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
         return new AlterConfigsResponse(20, errors);
     }
 
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index f07a11c..303c642 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -64,7 +64,7 @@ case class Resource(resourceType: ResourceType, name: String, nameType: Resource
     * @param name         non-null resource name
     * @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, ResourceNameType)]]
     */
-  @deprecated("Use Resource(ResourceType, String, ResourceNameType")
+  @deprecated("Use Resource(ResourceType, String, ResourceNameType", "Since 2.0")
   def this(resourceType: ResourceType, name: String) {
     this(resourceType, name, ResourceNameType.LITERAL)
   }
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 01457a1..e9598e3 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.CreateTopicsRequest._
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
-import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType}
+import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
 
@@ -281,7 +281,7 @@ class AdminManager(val config: KafkaConfig,
     }
   }
 
-  def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]], includeSynonyms: Boolean): Map[Resource, DescribeConfigsResponse.Config] = {
+  def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = {
     resourceToConfigNames.map { case (resource, configNames) =>
 
       def allConfigs(config: AbstractConfig) = {
@@ -301,7 +301,7 @@ class AdminManager(val config: KafkaConfig,
       try {
         val resourceConfig = resource.`type` match {
 
-          case ResourceType.TOPIC =>
+          case ConfigResource.Type.TOPIC =>
             val topic = resource.name
             Topic.validate(topic)
             if (metadataCache.contains(topic)) {
@@ -313,7 +313,7 @@ class AdminManager(val config: KafkaConfig,
               new DescribeConfigsResponse.Config(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null), Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
             }
 
-          case ResourceType.BROKER =>
+          case ConfigResource.Type.BROKER =>
             if (resource.name == null || resource.name.isEmpty)
               createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
                 createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms))
@@ -339,7 +339,7 @@ class AdminManager(val config: KafkaConfig,
     }.toMap
   }
 
-  def alterConfigs(configs: Map[Resource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[Resource, ApiError] = {
+  def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
     configs.map { case (resource, config) =>
 
       def validateConfigPolicy(resourceType: ConfigResource.Type): Unit = {
@@ -353,7 +353,7 @@ class AdminManager(val config: KafkaConfig,
       }
       try {
         resource.`type` match {
-          case ResourceType.TOPIC =>
+          case ConfigResource.Type.TOPIC =>
             val topic = resource.name
 
             val properties = new Properties
@@ -368,7 +368,7 @@ class AdminManager(val config: KafkaConfig,
 
             resource -> ApiError.NONE
 
-          case ResourceType.BROKER =>
+          case ConfigResource.Type.BROKER =>
             val brokerId = if (resource.name == null || resource.name.isEmpty)
               None
             else {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 874a209..ae7845b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -39,6 +39,7 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.utils.{CoreUtils, Logging}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
@@ -51,7 +52,7 @@ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.ResourceNameType.LITERAL
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
@@ -2038,9 +2039,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     val alterConfigsRequest = request.body[AlterConfigsRequest]
     val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.partition { case (resource, _) =>
       resource.`type` match {
-        case RResourceType.BROKER =>
+        case ConfigResource.Type.BROKER =>
           authorize(request.session, AlterConfigs, Resource.ClusterResource)
-        case RResourceType.TOPIC =>
+        case ConfigResource.Type.TOPIC =>
           authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
@@ -2053,10 +2054,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
   }
 
-  private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = {
+  private def configsAuthorizationApiError(session: RequestChannel.Session, resource: ConfigResource): ApiError = {
     val error = resource.`type` match {
-      case RResourceType.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED
-      case RResourceType.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
+      case ConfigResource.Type.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED
+      case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
       case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
     }
     new ApiError(error, null)
@@ -2066,8 +2067,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val describeConfigsRequest = request.body[DescribeConfigsRequest]
     val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource =>
       resource.`type` match {
-        case RResourceType.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
-        case RResourceType.TOPIC =>
+        case ConfigResource.Type.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
+        case ConfigResource.Type.TOPIC =>
           authorize(request.session, DescribeConfigs, Resource(Topic, resource.name, LITERAL))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
       }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b48a349..c1923dc 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.network.ListenerName
@@ -39,7 +40,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
-import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.ResourceNameType.LITERAL
 import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
@@ -175,9 +176,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => resp.responses.get(deleteRecordsPartition).error),
     ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error),
     ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) =>
-      resp.configs.get(new RResource(RResourceType.TOPIC, tp.topic)).error.error),
+      resp.configs.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error.error),
     ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) =>
-      resp.errors.get(new RResource(RResourceType.TOPIC, tp.topic)).error),
+      resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error),
     ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error),
     ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errors(producerId).get(tp)),
     ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)),
@@ -366,11 +367,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   private def deleteRecordsRequest = new DeleteRecordsRequest.Builder(5000, Collections.singletonMap(deleteRecordsPartition, 0L)).build()
 
   private def describeConfigsRequest =
-    new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic))).build()
+    new DescribeConfigsRequest.Builder(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic))).build()
 
   private def alterConfigsRequest =
     new AlterConfigsRequest.Builder(
-      Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
+      Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic),
         new AlterConfigsRequest.Config(Collections.singleton(
           new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
         ))), true).build()
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index f56b3b4..f503546 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,6 +24,7 @@ import kafka.security.auth._
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
@@ -33,7 +34,7 @@ import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.resource.ResourceNameType
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
-import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.common.utils.SecurityUtils
@@ -322,11 +323,11 @@ class RequestQuotaTest extends BaseRequestTest {
             new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY))))
 
         case ApiKeys.DESCRIBE_CONFIGS =>
-          new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic)))
+          new DescribeConfigsRequest.Builder(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)))
 
         case ApiKeys.ALTER_CONFIGS =>
           new AlterConfigsRequest.Builder(
-            Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
+            Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic),
               new AlterConfigsRequest.Config(Collections.singleton(
                 new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
               ))), true)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

Mime
View raw message