kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7832: Use automatic RPC generation in CreateTopics (#5972)
Date Mon, 04 Feb 2019 18:40:01 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e2e8bdb  KAFKA-7832: Use automatic RPC generation in CreateTopics (#5972)
e2e8bdb is described below

commit e2e8bdbd8cb6ca2ac962c72147d21a9e8b9ba2c0
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Mon Feb 4 10:39:43 2019 -0800

    KAFKA-7832: Use automatic RPC generation in CreateTopics (#5972)
    
    Reviewers: Jun Rao <junrao@gmail.com>, Tom Bentley <tbentley@redhat.com>, Boyang Chen <bchen11@outlook.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  29 +-
 .../org/apache/kafka/clients/admin/NewTopic.java   |  33 ++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../kafka/common/requests/CreateTopicsRequest.java | 315 ++-------------------
 .../common/requests/CreateTopicsResponse.java      | 100 ++-----
 .../common/message/CreateTopicsRequest.json        |   4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  29 +-
 .../apache/kafka/common/message/MessageTest.java   |   3 +-
 .../kafka/common/requests/RequestContextTest.java  |   2 +-
 .../kafka/common/requests/RequestResponseTest.java |  54 ++--
 .../apache/kafka/connect/util/TopicAdminTest.java  |  12 +-
 .../src/main/scala/kafka/server/AdminManager.scala |  97 ++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  89 +++---
 .../kafka/api/AuthorizerIntegrationTest.scala      |  10 +-
 .../server/AbstractCreateTopicsRequestTest.scala   | 116 ++++++--
 .../kafka/server/CreateTopicsRequestTest.scala     | 142 ++++------
 .../server/CreateTopicsRequestWithPolicyTest.scala |  78 ++---
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  14 +-
 19 files changed, 461 insertions(+), 674 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 58baab7..1567d90 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -59,6 +59,9 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -1252,7 +1255,7 @@ public class KafkaAdminClient extends AdminClient {
     public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                            final CreateTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
-        final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
+        final CreatableTopicSet topics = new CreatableTopicSet();
         for (NewTopic newTopic : newTopics) {
             if (topicNameIsUnrepresentable(newTopic.name())) {
                 KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
@@ -1261,7 +1264,7 @@ public class KafkaAdminClient extends AdminClient {
                 topicFutures.put(newTopic.name(), future);
             } else if (!topicFutures.containsKey(newTopic.name())) {
                 topicFutures.put(newTopic.name(), new KafkaFutureImpl<>());
-                topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
+                topics.add(newTopic.convertToCreatableTopic());
             }
         }
         final long now = time.milliseconds();
@@ -1270,27 +1273,33 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             public AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.shouldValidateOnly());
+                return new CreateTopicsRequest.Builder(
+                    new CreateTopicsRequestData().
+                        setTopics(topics).
+                        setTimeoutMs(timeoutMs).
+                        setValidateOnly(options.shouldValidateOnly()));
             }
 
             @Override
             public void handleResponse(AbstractResponse abstractResponse) {
                 CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
                 // Check for controller change
-                for (ApiError error : response.errors().values()) {
-                    if (error.error() == Errors.NOT_CONTROLLER) {
+                for (Errors error : response.errorCounts().keySet()) {
+                    if (error == Errors.NOT_CONTROLLER) {
                         metadataManager.clearController();
                         metadataManager.requestUpdate();
                         throw error.exception();
                     }
                 }
                 // Handle server responses for particular topics.
-                for (Map.Entry<String, ApiError> entry : response.errors().entrySet()) {
-                    KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
+                for (CreatableTopicResult result : response.data().topics()) {
+                    KafkaFutureImpl<Void> future = topicFutures.get(result.name());
                     if (future == null) {
-                        log.warn("Server response mentioned unknown topic {}", entry.getKey());
+                        log.warn("Server response mentioned unknown topic {}", result.name());
                     } else {
-                        ApiException exception = entry.getValue().exception();
+                        ApiError error = new ApiError(
+                            Errors.forCode(result.errorCode()), result.errorMessage());
+                        ApiException exception = error.exception();
                         if (exception != null) {
                             future.completeExceptionally(exception);
                         } else {
@@ -1313,7 +1322,7 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicsMap.isEmpty()) {
+        if (!topics.isEmpty()) {
             runnable.call(call, now);
         }
         return new CreateTopicsResult(new HashMap<>(topicFutures));
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
index c4bc218..5b1bd32 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -17,12 +17,15 @@
 
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * A new topic to be created via {@link AdminClient#createTopics(Collection)}.
@@ -105,20 +108,28 @@ public class NewTopic {
         return configs;
     }
 
-    TopicDetails convertToTopicDetails() {
+    CreatableTopic convertToCreatableTopic() {
+        CreatableTopic creatableTopic = new CreatableTopic().
+            setName(name).
+            setNumPartitions(numPartitions).
+            setReplicationFactor(replicationFactor);
         if (replicasAssignments != null) {
-            if (configs != null) {
-                return new TopicDetails(replicasAssignments, configs);
-            } else {
-                return new TopicDetails(replicasAssignments);
+            for (Entry<Integer, List<Integer>> entry : replicasAssignments.entrySet()) {
+                creatableTopic.assignments().add(
+                    new CreatableReplicaAssignment().
+                        setPartitionIndex(entry.getKey()).
+                        setBrokerIds(entry.getValue()));
             }
-        } else {
-            if (configs != null) {
-                return new TopicDetails(numPartitions, replicationFactor, configs);
-            } else {
-                return new TopicDetails(numPartitions, replicationFactor);
+        }
+        if (configs != null) {
+            for (Entry<String, String> entry : configs.entrySet()) {
+                creatableTopic.configs().add(
+                    new CreateableTopicConfig().
+                        setName(entry.getKey()).
+                        setValue(entry.getValue()));
             }
         }
+        return creatableTopic;
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 13e1483..1acd4e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.protocol;
 
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -43,8 +45,6 @@ import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
-import org.apache.kafka.common.requests.CreateTopicsRequest;
-import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
@@ -152,7 +152,7 @@ public enum ApiKeys {
             return parseResponse(version, buffer, (short) 0);
         }
     },
-    CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequest.schemaVersions(), CreateTopicsResponse.schemaVersions()),
+    CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS),
     DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequest.schemaVersions(), DeleteTopicsResponse.schemaVersions()),
     DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
     INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 00a044b..d9bb69e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -109,7 +109,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case API_VERSIONS:
                 return new ApiVersionsResponse(struct);
             case CREATE_TOPICS:
-                return new CreateTopicsResponse(struct);
+                return new CreateTopicsResponse(struct, version);
             case DELETE_TOPICS:
                 return new DeleteTopicsResponse(struct);
             case DELETE_RECORDS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 16d4c97..93f7ab2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -18,286 +18,73 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class CreateTopicsRequest extends AbstractRequest {
-    private static final String REQUESTS_KEY_NAME = "create_topic_requests";
-
-    private static final String TIMEOUT_KEY_NAME = "timeout";
-    private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
-    private static final String NUM_PARTITIONS_KEY_NAME = "num_partitions";
-    private static final String REPLICATION_FACTOR_KEY_NAME = "replication_factor";
-    private static final String REPLICA_ASSIGNMENT_KEY_NAME = "replica_assignment";
-    private static final String REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME = "replicas";
-
-    private static final String CONFIG_NAME_KEY_NAME = "config_name";
-    private static final String CONFIG_VALUE_KEY_NAME = "config_value";
-    private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
-
-    private static final Schema CONFIG_ENTRY = new Schema(
-            new Field(CONFIG_NAME_KEY_NAME, STRING, "Configuration name"),
-            new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING, "Configuration value"));
-
-    private static final Schema PARTITION_REPLICA_ASSIGNMENT_ENTRY = new Schema(
-            PARTITION_ID,
-            new Field(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, new ArrayOf(INT32), "The set of all nodes that should " +
-                    "host this partition. The first replica in the list is the preferred leader."));
-
-    private static final Schema SINGLE_CREATE_TOPIC_REQUEST_V0 = new Schema(
-            TOPIC_NAME,
-            new Field(NUM_PARTITIONS_KEY_NAME, INT32, "Number of partitions to be created. -1 indicates unset."),
-            new Field(REPLICATION_FACTOR_KEY_NAME, INT16, "Replication factor for the topic. -1 indicates unset."),
-            new Field(REPLICA_ASSIGNMENT_KEY_NAME, new ArrayOf(PARTITION_REPLICA_ASSIGNMENT_ENTRY),
-                    "Replica assignment among kafka brokers for this topic partitions. If this is set num_partitions " +
-                            "and replication_factor must be unset."),
-            new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY), "Topic level configuration for topic to be set."));
-
-    private static final Schema SINGLE_CREATE_TOPIC_REQUEST_V1 = SINGLE_CREATE_TOPIC_REQUEST_V0;
-
-    private static final Schema CREATE_TOPICS_REQUEST_V0 = new Schema(
-            new Field(REQUESTS_KEY_NAME, new ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V0),
-                    "An array of single topic creation requests. Can not have multiple entries for the same topic."),
-            new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a topic to be completely created on the " +
-                    "controller node. Values <= 0 will trigger topic creation and return immediately"));
-
-    private static final Schema CREATE_TOPICS_REQUEST_V1 = new Schema(
-            new Field(REQUESTS_KEY_NAME, new ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V1), "An array of single " +
-                    "topic creation requests. Can not have multiple entries for the same topic."),
-            new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a topic to be completely created on the " +
-                    "controller node. Values <= 0 will trigger topic creation and return immediately"),
-            new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN, "If this is true, the request will be validated, but the " +
-                    "topic won't be created."));
-
-    /* v2 request is the same as v1. Throttle time has been added to the response */
-    private static final Schema CREATE_TOPICS_REQUEST_V2 = CREATE_TOPICS_REQUEST_V1;
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema CREATE_TOPICS_REQUEST_V3 = CREATE_TOPICS_REQUEST_V2;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2,
-            CREATE_TOPICS_REQUEST_V3};
-    }
-
-    public static final class TopicDetails {
-        public final int numPartitions;
-        public final short replicationFactor;
-        public final Map<Integer, List<Integer>> replicasAssignments;
-        public final Map<String, String> configs;
-
-        private TopicDetails(int numPartitions,
-                             short replicationFactor,
-                             Map<Integer, List<Integer>> replicasAssignments,
-                             Map<String, String> configs) {
-            this.numPartitions = numPartitions;
-            this.replicationFactor = replicationFactor;
-            this.replicasAssignments = replicasAssignments;
-            this.configs = configs;
-        }
-
-        public TopicDetails(int partitions,
-                            short replicationFactor,
-                            Map<String, String> configs) {
-            this(partitions, replicationFactor, Collections.emptyMap(), configs);
-        }
-
-        public TopicDetails(int partitions,
-                            short replicationFactor) {
-            this(partitions, replicationFactor, Collections.emptyMap());
-        }
-
-        public TopicDetails(Map<Integer, List<Integer>> replicasAssignments,
-                            Map<String, String> configs) {
-            this(NO_NUM_PARTITIONS, NO_REPLICATION_FACTOR, replicasAssignments, configs);
-        }
-
-        public TopicDetails(Map<Integer, List<Integer>> replicasAssignments) {
-            this(replicasAssignments, Collections.emptyMap());
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(numPartitions=").append(numPartitions).
-                    append(", replicationFactor=").append(replicationFactor).
-                    append(", replicasAssignments=").append(replicasAssignments).
-                    append(", configs=").append(configs).
-                    append(")");
-            return bld.toString();
-        }
-    }
-
     public static class Builder extends AbstractRequest.Builder<CreateTopicsRequest> {
-        private final Map<String, TopicDetails> topics;
-        private final int timeout;
-        private final boolean validateOnly; // introduced in V1
-
-        public Builder(Map<String, TopicDetails> topics, int timeout) {
-            this(topics, timeout, false);
-        }
+        private final CreateTopicsRequestData data;
 
-        public Builder(Map<String, TopicDetails> topics, int timeout, boolean validateOnly) {
+        public Builder(CreateTopicsRequestData data) {
             super(ApiKeys.CREATE_TOPICS);
-            this.topics = topics;
-            this.timeout = timeout;
-            this.validateOnly = validateOnly;
+            this.data = data;
         }
 
         @Override
         public CreateTopicsRequest build(short version) {
-            if (validateOnly && version == 0)
+            if (data.validateOnly() && version == 0)
                 throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " +
                         "CreateTopicsRequest");
-            return new CreateTopicsRequest(topics, timeout, validateOnly, version);
+            return new CreateTopicsRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=CreateTopicsRequest").
-                append(", topics=").append(topics).
-                append(", timeout=").append(timeout).
-                append(", validateOnly=").append(validateOnly).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private final Map<String, TopicDetails> topics;
-    private final Integer timeout;
-    private final boolean validateOnly; // introduced in V1
-
-    // Set to handle special case where 2 requests for the same topic exist on the wire.
-    // This allows the broker to return an error code for these topics.
-    private final Set<String> duplicateTopics;
+    private final CreateTopicsRequestData data;
+    private final short version;
 
     public static final int NO_NUM_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
 
-    private CreateTopicsRequest(Map<String, TopicDetails> topics, Integer timeout, boolean validateOnly, short version) {
+    private CreateTopicsRequest(CreateTopicsRequestData data, short version) {
         super(ApiKeys.CREATE_TOPICS, version);
-        this.topics = topics;
-        this.timeout = timeout;
-        this.validateOnly = validateOnly;
-        this.duplicateTopics = Collections.emptySet();
+        this.data = data;
+        this.version = version;
     }
 
     public CreateTopicsRequest(Struct struct, short version) {
         super(ApiKeys.CREATE_TOPICS, version);
+        this.data = new CreateTopicsRequestData(struct, version);
+        this.version = version;
+    }
 
-        Object[] requestStructs = struct.getArray(REQUESTS_KEY_NAME);
-        Map<String, TopicDetails> topics = new HashMap<>();
-        Set<String> duplicateTopics = new HashSet<>();
-
-        for (Object requestStructObj : requestStructs) {
-            Struct singleRequestStruct = (Struct) requestStructObj;
-            String topic = singleRequestStruct.get(TOPIC_NAME);
-
-            if (topics.containsKey(topic))
-                duplicateTopics.add(topic);
-
-            int numPartitions = singleRequestStruct.getInt(NUM_PARTITIONS_KEY_NAME);
-            short replicationFactor = singleRequestStruct.getShort(REPLICATION_FACTOR_KEY_NAME);
-
-            //replica assignment
-            Object[] assignmentsArray = singleRequestStruct.getArray(REPLICA_ASSIGNMENT_KEY_NAME);
-            Map<Integer, List<Integer>> partitionReplicaAssignments = new HashMap<>(assignmentsArray.length);
-            for (Object assignmentStructObj : assignmentsArray) {
-                Struct assignmentStruct = (Struct) assignmentStructObj;
-
-                Integer partitionId = assignmentStruct.get(PARTITION_ID);
-
-                Object[] replicasArray = assignmentStruct.getArray(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME);
-                List<Integer> replicas = new ArrayList<>(replicasArray.length);
-                for (Object replica : replicasArray) {
-                    replicas.add((Integer) replica);
-                }
-
-                partitionReplicaAssignments.put(partitionId, replicas);
-            }
-
-            Object[] configArray = singleRequestStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
-            Map<String, String> configs = new HashMap<>(configArray.length);
-            for (Object configStructObj : configArray) {
-                Struct configStruct = (Struct) configStructObj;
-
-                String key = configStruct.getString(CONFIG_NAME_KEY_NAME);
-                String value = configStruct.getString(CONFIG_VALUE_KEY_NAME);
-
-                configs.put(key, value);
-            }
-
-            TopicDetails args = new TopicDetails(numPartitions, replicationFactor, partitionReplicaAssignments, configs);
-
-            topics.put(topic, args);
-        }
-
-        this.topics = topics;
-        this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
-        if (struct.hasField(VALIDATE_ONLY_KEY_NAME))
-            this.validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
-        else
-            this.validateOnly = false;
-        this.duplicateTopics = duplicateTopics;
+    public CreateTopicsRequestData data() {
+        return data;
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<String, ApiError> topicErrors = new HashMap<>();
-        for (String topic : topics.keySet()) {
-            topicErrors.put(topic, ApiError.fromThrowable(e));
+        CreateTopicsResponseData response = new CreateTopicsResponseData();
+        if (version() >= 2) {
+            response.setThrottleTimeMs(throttleTimeMs);
         }
-
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-            case 1:
-                return new CreateTopicsResponse(topicErrors);
-            case 2:
-            case 3:
-                return new CreateTopicsResponse(throttleTimeMs, topicErrors);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                    versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_TOPICS.latestVersion()));
+        ApiError apiError = ApiError.fromThrowable(e);
+        for (CreatableTopic topic : data.topics()) {
+            response.topics().add(new CreatableTopicResult().
+                setName(topic.name()).
+                setErrorCode(apiError.error().code()).
+                setErrorMessage(apiError.message()));
         }
-    }
-
-    public Map<String, TopicDetails> topics() {
-        return this.topics;
-    }
-
-    public int timeout() {
-        return this.timeout;
-    }
-
-    public boolean validateOnly() {
-        return validateOnly;
-    }
-
-    public Set<String> duplicateTopics() {
-        return this.duplicateTopics;
+        return new CreateTopicsResponse(response);
     }
 
     public static CreateTopicsRequest parse(ByteBuffer buffer, short version) {
@@ -309,46 +96,6 @@ public class CreateTopicsRequest extends AbstractRequest {
      */
     @Override
     public Struct toStruct() {
-        short version = version();
-        Struct struct = new Struct(ApiKeys.CREATE_TOPICS.requestSchema(version));
-
-        List<Struct> createTopicRequestStructs = new ArrayList<>(topics.size());
-        for (Map.Entry<String, TopicDetails> entry : topics.entrySet()) {
-
-            Struct singleRequestStruct = struct.instance(REQUESTS_KEY_NAME);
-            String topic = entry.getKey();
-            TopicDetails args = entry.getValue();
-
-            singleRequestStruct.set(TOPIC_NAME, topic);
-            singleRequestStruct.set(NUM_PARTITIONS_KEY_NAME, args.numPartitions);
-            singleRequestStruct.set(REPLICATION_FACTOR_KEY_NAME, args.replicationFactor);
-
-            // replica assignment
-            List<Struct> replicaAssignmentsStructs = new ArrayList<>(args.replicasAssignments.size());
-            for (Map.Entry<Integer, List<Integer>> partitionReplicaAssignment : args.replicasAssignments.entrySet()) {
-                Struct replicaAssignmentStruct = singleRequestStruct.instance(REPLICA_ASSIGNMENT_KEY_NAME);
-                replicaAssignmentStruct.set(PARTITION_ID, partitionReplicaAssignment.getKey());
-                replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, partitionReplicaAssignment.getValue().toArray());
-                replicaAssignmentsStructs.add(replicaAssignmentStruct);
-            }
-            singleRequestStruct.set(REPLICA_ASSIGNMENT_KEY_NAME, replicaAssignmentsStructs.toArray());
-
-            // configs
-            List<Struct> configsStructs = new ArrayList<>(args.configs.size());
-            for (Map.Entry<String, String> configEntry : args.configs.entrySet()) {
-                Struct configStruct = singleRequestStruct.instance(CONFIG_ENTRIES_KEY_NAME);
-                configStruct.set(CONFIG_NAME_KEY_NAME, configEntry.getKey());
-                configStruct.set(CONFIG_VALUE_KEY_NAME, configEntry.getValue());
-                configsStructs.add(configStruct);
-            }
-            singleRequestStruct.set(CONFIG_ENTRIES_KEY_NAME, configsStructs.toArray());
-            createTopicRequestStructs.add(singleRequestStruct);
-        }
-        struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray());
-        struct.set(TIMEOUT_KEY_NAME, timeout);
-        if (version >= 1)
-            struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
-        return struct;
-
+        return data.toStruct(version);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 54d33a5..3cfc60c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -14,60 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.requests;
 
+package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-
 public class CreateTopicsResponse extends AbstractResponse {
-    private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
-
-    private static final Schema TOPIC_ERROR_CODE = new Schema(
-            TOPIC_NAME,
-            ERROR_CODE);
-
-    // Improves on TOPIC_ERROR_CODE by adding an error_message to complement the error_code
-    private static final Schema TOPIC_ERROR = new Schema(
-            TOPIC_NAME,
-            ERROR_CODE,
-            ERROR_MESSAGE);
-
-    private static final Schema CREATE_TOPICS_RESPONSE_V0 = new Schema(
-            new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes."));
-
-    private static final Schema CREATE_TOPICS_RESPONSE_V1 = new Schema(
-            new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An array of per topic errors."));
-
-    private static final Schema CREATE_TOPICS_RESPONSE_V2 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An array of per topic errors."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema CREATE_TOPICS_RESPONSE_V3 = CREATE_TOPICS_RESPONSE_V2;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2,
-            CREATE_TOPICS_RESPONSE_V3};
-    }
-
     /**
      * Possible error codes:
      *
@@ -84,63 +44,43 @@ public class CreateTopicsResponse extends AbstractResponse {
      * POLICY_VIOLATION(44)
      */
 
-    private final Map<String, ApiError> errors;
-    private final int throttleTimeMs;
+    private final CreateTopicsResponseData data;
 
-    public CreateTopicsResponse(Map<String, ApiError> errors) {
-        this(DEFAULT_THROTTLE_TIME, errors);
+    public CreateTopicsResponse(CreateTopicsResponseData data) {
+        this.data = data;
     }
 
-    public CreateTopicsResponse(int throttleTimeMs, Map<String, ApiError> errors) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.errors = errors;
+    public CreateTopicsResponse(Struct struct, short version) {
+        this.data = new CreateTopicsResponseData(struct, version);
     }
 
-    public CreateTopicsResponse(Struct struct) {
-        Object[] topicErrorStructs = struct.getArray(TOPIC_ERRORS_KEY_NAME);
-        Map<String, ApiError> errors = new HashMap<>();
-        for (Object topicErrorStructObj : topicErrorStructs) {
-            Struct topicErrorStruct = (Struct) topicErrorStructObj;
-            String topic = topicErrorStruct.get(TOPIC_NAME);
-            errors.put(topic, new ApiError(topicErrorStruct));
-        }
-
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        this.errors = errors;
+    public CreateTopicsResponseData data() {
+        return data;
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.CREATE_TOPICS.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
-        List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
-        for (Map.Entry<String, ApiError> topicError : errors.entrySet()) {
-            Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
-            topicErrorsStruct.set(TOPIC_NAME, topicError.getKey());
-            topicError.getValue().write(topicErrorsStruct);
-            topicErrorsStructs.add(topicErrorsStruct);
-        }
-        struct.set(TOPIC_ERRORS_KEY_NAME, topicErrorsStructs.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public Map<String, ApiError> errors() {
-        return errors;
+        return data.throttleTimeMs();
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return apiErrorCounts(errors);
+        HashMap<Errors, Integer> counts = new HashMap<>();
+        for (CreatableTopicResult result : data.topics()) {
+            Errors error = Errors.forCode(result.errorCode());
+            counts.put(error, counts.getOrDefault(error, 0) + 1);
+        }
+        return counts;
     }
 
     public static CreateTopicsResponse parse(ByteBuffer buffer, short version) {
-        return new CreateTopicsResponse(ApiKeys.CREATE_TOPICS.responseSchema(version).read(buffer));
+        return new CreateTopicsResponse(
+            ApiKeys.CREATE_TOPICS.responseSchema(version).read(buffer), version);
     }
 
     @Override
diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json
index b76d13d..da6e6c2 100644
--- a/clients/src/main/resources/common/message/CreateTopicsRequest.json
+++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json
@@ -22,7 +22,7 @@
   "fields": [
     { "name": "Topics", "type": "[]CreatableTopic", "versions": "0+",
       "about": "The topics to create.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
         "about": "The topic name." },
       { "name": "NumPartitions", "type": "int32", "versions": "0+",
         "about": "The number of partitions to create in the topic, or -1 if we are specifying a manual partition assignment." },
@@ -43,7 +43,7 @@
           "about": "The configuration value." }
       ]}
     ]},
-    { "name": "timeoutMs", "type": "int32", "versions": "0+",
+    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
       "about": "How long to wait in milliseconds before timing out the request." },
     { "name": "validateOnly", "type": "bool", "versions": "1+", "default": "false", "ignorable": false,
       "about": "If true, check that the topics can be created as specified, but don't create anything." }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 12b076d..debb3a8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -51,6 +51,8 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
@@ -213,6 +215,13 @@ public class KafkaAdminClientTest {
         }
     }
 
+    private static CreateTopicsResponse prepareCreateTopicsResponse(String topicName, Errors error) {
+        CreateTopicsResponseData data = new CreateTopicsResponseData();
+        data.topics().add(new CreatableTopicResult().
+            setName(topicName).setErrorCode(error.code()));
+        return new CreateTopicsResponse(data);
+    }
+
     /**
      * Test that the client properly times out when we don't receive any metadata.
      */
@@ -221,7 +230,7 @@ public class KafkaAdminClientTest {
         try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(),
                 newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+            env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                     new CreateTopicsOptions().timeoutMs(1000)).all();
@@ -243,7 +252,7 @@ public class KafkaAdminClientTest {
                     new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));
             env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
-                    new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+                    prepareCreateTopicsResponse("myTopic", Errors.NONE));
 
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -267,7 +276,7 @@ public class KafkaAdminClientTest {
                     new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));
             env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
-                    new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+                prepareCreateTopicsResponse("myTopic", Errors.NONE));
 
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -289,7 +298,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0),
                     TimeUnit.DAYS.toMillis(1));
-            env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+            env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                 Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                 new CreateTopicsOptions().timeoutMs(1000)).all();
@@ -302,7 +311,7 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
-                    new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+                    prepareCreateTopicsResponse("myTopic", Errors.NONE));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                     new CreateTopicsOptions().timeoutMs(10000)).all();
@@ -333,7 +342,7 @@ public class KafkaAdminClientTest {
             mockClient.prepareResponse(body -> {
                 secondAttemptTime.set(time.milliseconds());
                 return body instanceof CreateTopicsRequest;
-            }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+            }, prepareCreateTopicsResponse("myTopic", Errors.NONE));
 
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -356,15 +365,15 @@ public class KafkaAdminClientTest {
     public void testCreateTopicsHandleNotControllerException() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
-                Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))),
+            env.kafkaClient().prepareResponseFrom(
+                prepareCreateTopicsResponse("myTopic", Errors.NOT_CONTROLLER),
                 env.cluster().nodeById(0));
             env.kafkaClient().prepareResponse(new MetadataResponse(env.cluster().nodes(),
                 env.cluster().clusterResource().clusterId(),
                 1,
                 Collections.<MetadataResponse.TopicMetadata>emptyList()));
-            env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
-                    Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))),
+            env.kafkaClient().prepareResponseFrom(
+                prepareCreateTopicsResponse("myTopic", Errors.NONE),
                 env.cluster().nodeById(1));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 26ace89..b725e70 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.common.message;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Message;
@@ -75,7 +76,7 @@ public final class MessageTest {
                     setName("Topic").
                     setPartitions(Collections.singletonList(1))).iterator())));
         testMessageRoundTrips(new CreateTopicsRequestData().
-            setTimeoutMs(1000).setTopics(Collections.emptyList()));
+            setTimeoutMs(1000).setTopics(new CreatableTopicSet()));
         testMessageRoundTrips(new DescribeAclsRequestData().
             setResourceType((byte) 42).
             setResourceNameFilter(null).
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index 857869f..7c24d1f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -69,7 +69,7 @@ public class RequestContextTest {
 
         Struct struct = ApiKeys.API_VERSIONS.parseResponse((short) 0, responseBuffer);
         ApiVersionsResponse response = (ApiVersionsResponse)
-                AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct, (short) 0);
+            AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct, (short) 0);
         assertEquals(Errors.UNSUPPORTED_VERSION, response.error());
         assertTrue(response.apiVersions().isEmpty());
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 142321b..953b2bd 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -32,6 +32,12 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
@@ -1034,28 +1040,38 @@ public class RequestResponseTest {
     }
 
     private CreateTopicsRequest createCreateTopicRequest(int version, boolean validateOnly) {
-        CreateTopicsRequest.TopicDetails request1 = new CreateTopicsRequest.TopicDetails(3, (short) 5);
-
-        Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
-        replicaAssignments.put(1, asList(1, 2, 3));
-        replicaAssignments.put(2, asList(2, 3, 4));
-
-        Map<String, String> configs = new HashMap<>();
-        configs.put("config1", "value1");
-
-        CreateTopicsRequest.TopicDetails request2 = new CreateTopicsRequest.TopicDetails(replicaAssignments, configs);
-
-        Map<String, CreateTopicsRequest.TopicDetails> request = new HashMap<>();
-        request.put("my_t1", request1);
-        request.put("my_t2", request2);
-        return new CreateTopicsRequest.Builder(request, 0, validateOnly).build((short) version);
+        CreateTopicsRequestData data = new CreateTopicsRequestData().
+            setTimeoutMs(123).
+            setValidateOnly(validateOnly);
+        data.topics().add(new CreatableTopic().
+            setNumPartitions(3).
+            setReplicationFactor((short) 5));
+
+        CreatableTopic topic2 = new CreatableTopic();
+        data.topics().add(topic2);
+        topic2.assignments().add(new CreatableReplicaAssignment().
+            setPartitionIndex(0).
+            setBrokerIds(Arrays.asList(1, 2, 3)));
+        topic2.assignments().add(new CreatableReplicaAssignment().
+            setPartitionIndex(1).
+            setBrokerIds(Arrays.asList(2, 3, 4)));
+        topic2.configs().add(new CreateableTopicConfig().
+            setName("config1").setValue("value1"));
+
+        return new CreateTopicsRequest.Builder(data).build((short) version);
     }
 
     private CreateTopicsResponse createCreateTopicResponse() {
-        Map<String, ApiError> errors = new HashMap<>();
-        errors.put("t1", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, null));
-        errors.put("t2", new ApiError(Errors.LEADER_NOT_AVAILABLE, "Leader with id 5 is not available."));
-        return new CreateTopicsResponse(errors);
+        CreateTopicsResponseData data = new CreateTopicsResponseData();
+        data.topics().add(new CreatableTopicResult().
+            setName("t1").
+            setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code()).
+            setErrorMessage(null));
+        data.topics().add(new CreatableTopicResult().
+            setName("t2").
+            setErrorCode(Errors.LEADER_NOT_AVAILABLE.code()).
+            setErrorMessage("Leader with id 5 is not available."));
+        return new CreateTopicsResponse(data);
     }
 
     private DeleteTopicsRequest createDeleteTopicsRequest() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 26c8e71..64ddfeb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -32,7 +34,6 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -137,10 +138,13 @@ public class TopicAdminTest {
 
     private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
         if (error == null) error = new ApiError(Errors.NONE, "");
-        Map<String, ApiError> topicResults = new HashMap<>();
+        CreateTopicsResponseData response = new CreateTopicsResponseData();
         for (NewTopic topic : topics) {
-            topicResults.put(topic.name(), error);
+            response.topics().add(new CreatableTopicResult().
+                setName(topic.name()).
+                setErrorCode(error.error().code()).
+                setErrorMessage(error.message()));
         }
-        return new CreateTopicsResponse(topicResults);
+        return new CreateTopicsResponse(response);
     }
 }
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 4850eb4..0cdaad6 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -25,8 +25,9 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException, InvalidConfigurationException}
+import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
@@ -37,7 +38,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, Describe
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
 
-import scala.collection._
+import scala.collection.{mutable, _}
 import scala.collection.JavaConverters._
 
 class AdminManager(val config: KafkaConfig,
@@ -73,72 +74,89 @@ class AdminManager(val config: KafkaConfig,
     */
   def createTopics(timeout: Int,
                    validateOnly: Boolean,
-                   createInfo: Map[String, TopicDetails],
+                   toCreate: Map[String, CreatableTopic],
                    responseCallback: Map[String, ApiError] => Unit) {
 
     // 1. map over topics creating assignment and calling zookeeper
     val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
-    val metadata = createInfo.map { case (topic, arguments) =>
+    val metadata = toCreate.values.map(topic =>
       try {
         val configs = new Properties()
-        arguments.configs.asScala.foreach { case (key, value) =>
-          configs.setProperty(key, value)
+        topic.configs().asScala.foreach { case entry =>
+          configs.setProperty(entry.name(), entry.value())
         }
         LogConfig.validate(configs)
 
-        val assignments = {
-          if ((arguments.numPartitions != NO_NUM_PARTITIONS || arguments.replicationFactor != NO_REPLICATION_FACTOR)
-            && !arguments.replicasAssignments.isEmpty)
-            throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
-              "Both cannot be used at the same time.")
-          else if (!arguments.replicasAssignments.isEmpty) {
-            // Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
-            // this follows the existing logic in TopicCommand
-            arguments.replicasAssignments.asScala.map { case (partitionId, replicas) =>
-              (partitionId.intValue, replicas.asScala.map(_.intValue))
-            }
-          } else
-            AdminUtils.assignReplicasToBrokers(brokers, arguments.numPartitions, arguments.replicationFactor)
+        if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR)
+            && !topic.assignments().isEmpty) {
+          throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
+            "Both cannot be used at the same time.")
+        }
+        val assignments = if (topic.assignments().isEmpty) {
+          AdminUtils.assignReplicasToBrokers(brokers, topic.numPartitions, topic.replicationFactor)
+        } else {
+          val assignments = new mutable.HashMap[Int, Seq[Int]]
+          // Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
+          // this follows the existing logic in TopicCommand
+          topic.assignments.asScala.foreach {
+            case assignment => assignments(assignment.partitionIndex()) =
+              assignment.brokerIds().asScala.map(a => a: Int)
+          }
+          assignments
         }
         trace(s"Assignments for topic $topic are $assignments ")
 
         createTopicPolicy match {
           case Some(policy) =>
-            adminZkClient.validateTopicCreate(topic, assignments, configs)
+            adminZkClient.validateTopicCreate(topic.name(), assignments, configs)
 
             // Use `null` for unset fields in the public API
             val numPartitions: java.lang.Integer =
-              if (arguments.numPartitions == NO_NUM_PARTITIONS) null else arguments.numPartitions
+              if (topic.numPartitions == NO_NUM_PARTITIONS) null else topic.numPartitions
             val replicationFactor: java.lang.Short =
-              if (arguments.replicationFactor == NO_REPLICATION_FACTOR) null else arguments.replicationFactor
-            val replicaAssignments = if (arguments.replicasAssignments.isEmpty) null else arguments.replicasAssignments
-
-            policy.validate(new RequestMetadata(topic, numPartitions, replicationFactor, replicaAssignments,
-              arguments.configs))
+              if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor
+            val javaAssignments = if (topic.assignments().isEmpty) {
+              null
+            } else {
+              val map = new java.util.HashMap[Integer, java.util.List[Integer]]
+              assignments.foreach {
+                case (k, v) => {
+                  val list = new java.util.ArrayList[Integer]
+                  v.foreach {
+                    case i => list.add(Integer.valueOf(i))
+                  }
+                  map.put(k, list)
+                }
+              }
+              map
+            }
+            val javaConfigs = new java.util.HashMap[String, String]
+            topic.configs().asScala.foreach(config => javaConfigs.put(config.name(), config.value()))
+            policy.validate(new RequestMetadata(topic.name, numPartitions, replicationFactor,
+              javaAssignments, javaConfigs))
 
             if (!validateOnly)
-              adminZkClient.createTopicWithAssignment(topic, configs, assignments)
+              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
 
           case None =>
             if (validateOnly)
-              adminZkClient.validateTopicCreate(topic, assignments, configs)
+              adminZkClient.validateTopicCreate(topic.name, assignments, configs)
             else
-              adminZkClient.createTopicWithAssignment(topic, configs, assignments)
+              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
         }
-        CreatePartitionsMetadata(topic, assignments, ApiError.NONE)
+        CreatePartitionsMetadata(topic.name, assignments, ApiError.NONE)
       } catch {
         // Log client errors at a lower level than unexpected exceptions
         case e: ApiException =>
-          info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
-          CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
+          info(s"Error processing create topic request $topic", e)
+          CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e))
         case e: ConfigException =>
-          info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
-          CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause)))
+          info(s"Error processing create topic request $topic", e)
+          CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause)))
         case e: Throwable =>
-          error(s"Error processing create topic request for topic $topic with arguments $arguments", e)
-          CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
-      }
-    }
+          error(s"Error processing create topic request $topic", e)
+          CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e))
+      })
 
     // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
     if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
@@ -154,7 +172,8 @@ class AdminManager(val config: KafkaConfig,
     } else {
       // 3. else pass the assignments and errors to the delayed operation and set the keys
       val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, responseCallback)
-      val delayedCreateKeys = createInfo.keys.map(new TopicKey(_)).toSeq
+      val delayedCreateKeys = toCreate.values.map(
+        topic => new TopicKey(topic.name())).toSeq
       // try to complete the request immediately, otherwise put it into the purgatory
       topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 040ff0e..75027f9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,6 +43,9 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.message.CreateTopicsResponseData
+import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
 import org.apache.kafka.common.message.LeaveGroupResponseData
 import org.apache.kafka.common.metrics.Metrics
@@ -50,7 +53,6 @@ import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
-import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -1393,60 +1395,61 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTopicsRequest(request: RequestChannel.Request) {
-    val createTopicsRequest = request.body[CreateTopicsRequest]
-
-    def sendResponseCallback(results: Map[String, ApiError]): Unit = {
+    def sendResponseCallback(results: CreatableTopicResultSet): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new CreateTopicsResponse(requestThrottleMs, results.asJava)
-        trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+        val responseData = new CreateTopicsResponseData().
+          setThrottleTimeMs(requestThrottleMs).
+          setTopics(results)
+        val responseBody = new CreateTopicsResponse(responseData)
+        trace(s"Sending create topics response $responseData for correlation id " +
+          "${request.header.correlationId} to client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
     }
 
+    val createTopicsRequest = request.body[CreateTopicsRequest]
+    val results = new CreatableTopicResultSet(createTopicsRequest.data().topics().size())
     if (!controller.isActive) {
-      val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
-        (topic, new ApiError(Errors.NOT_CONTROLLER, null))
+      createTopicsRequest.data.topics.asScala.foreach { case topic =>
+        results.add(new CreatableTopicResult().setName(topic.name()).
+          setErrorCode(Errors.NOT_CONTROLLER.code()))
       }
       sendResponseCallback(results)
     } else {
-      val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) =>
-        !createTopicsRequest.duplicateTopics.contains(topic)
+      createTopicsRequest.data.topics.asScala.foreach { case topic =>
+        results.add(new CreatableTopicResult().setName(topic.name()))
+      }
+      val hasClusterAuthorization = authorize(request.session, Create, Resource.ClusterResource)
+      results.asScala.foreach(topic => {
+        if (results.findAll(topic.name()).size() > 1) {
+          topic.setErrorCode(Errors.INVALID_REQUEST.code())
+          topic.setErrorMessage("Found multiple entries for this topic.")
+        } else if ((!hasClusterAuthorization) && (!authorize(request.session, Create,
+              new Resource(Topic, topic.name(), PatternType.LITERAL)))) {
+          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())
+          topic.setErrorMessage("Authorization failed.")
+        }
+      })
+      val toCreate = mutable.Map[String, CreatableTopic]()
+      createTopicsRequest.data.topics.asScala.foreach { case topic =>
+        if (results.find(topic.name()).errorCode() == 0) {
+          toCreate += topic.name() -> topic
+        }
       }
-
-      val (authorizedTopics, unauthorizedTopics) =
-        if (authorize(request.session, Create, Resource.ClusterResource)) {
-          (validTopics, Map[String, TopicDetails]())
-        } else {
-          validTopics.partition { case (topic, _) =>
-            authorize(request.session, Create, new Resource(Topic, topic, PatternType.LITERAL))
-          }
+      def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
+        errors.foreach {
+          case (topicName, error) =>
+            results.find(topicName).
+              setErrorCode(error.error().code()).
+              setErrorMessage(error.message())
         }
-
-      // Special handling to add duplicate topics to the response
-      def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = {
-
-        val duplicatedTopicsResults =
-          if (duplicateTopics.nonEmpty) {
-            val errorMessage = s"Create topics request from client `${request.header.clientId}` contains multiple entries " +
-              s"for the following topics: ${duplicateTopics.keySet.mkString(",")}"
-            // We can send the error message in the response for version 1, so we don't have to log it any more
-            if (request.header.apiVersion == 0)
-              warn(errorMessage)
-            duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
-          } else Map.empty
-
-        val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
-        val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults
-        sendResponseCallback(completeResults)
-      }
-
-      adminManager.createTopics(
-        createTopicsRequest.timeout,
-        createTopicsRequest.validateOnly,
-        authorizedTopics,
-        sendResponseWithDuplicatesCallback
-      )
+        sendResponseCallback(results)
+      }
+      adminManager.createTopics(createTopicsRequest.data.timeoutMs(),
+          createTopicsRequest.data.validateOnly(),
+          toCreate,
+          handleCreateTopicsResults)
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 31e01cc..f570a44 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -33,12 +33,13 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
 import org.apache.kafka.common.message.LeaveGroupRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
-import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
@@ -169,7 +170,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
     ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
     ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error),
-    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors.asScala.find(_._1 == createTopic).get._2.error),
+    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => Errors.forCode(resp.data().topics().find(createTopic).errorCode())),
     ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1 == deleteTopic).get._2),
     ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => resp.responses.get(deleteRecordsPartition).error),
     ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error),
@@ -351,7 +352,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build()
 
   private def createTopicsRequest =
-    new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0).build()
+    new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(
+      new CreatableTopicSet(Collections.singleton(new CreatableTopic().
+        setName(createTopic).setNumPartitions(1).
+          setReplicationFactor(1.toShort)).iterator))).build()
 
   private def deleteTopicsRequest = new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
 
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index d89a9df..bef894a 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -17,15 +17,19 @@
 
 package kafka.server
 
+import java.util
 import java.util.Properties
 
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableReplicaAssignment, CreatableReplicaAssignmentSet, CreatableTopic, CreatableTopicSet, CreateableTopicConfig, CreateableTopicConfigSet}
 import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse}
 import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
 
+import scala.collection.JavaConverters
 import scala.collection.JavaConverters._
 
 class AbstractCreateTopicsRequestTest extends BaseRequestTest {
@@ -33,30 +37,87 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
   override def propertyOverrides(properties: Properties): Unit =
     properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
 
+  def topicsReq(topics: Seq[CreatableTopic],
+                timeout: Integer = 10000,
+                validateOnly: Boolean = false) = {
+    val req = new CreateTopicsRequestData()
+    req.setTimeoutMs(timeout)
+    req.setTopics(new CreatableTopicSet(topics.asJava.iterator()))
+    req.setValidateOnly(validateOnly)
+    new CreateTopicsRequest.Builder(req).build()
+  }
+
+  def topicReq(name: String,
+               numPartitions: Integer = null,
+               replicationFactor: Integer = null,
+               config: Map[String, String] = null,
+               assignment: Map[Int, Seq[Int]] = null): CreatableTopic = {
+    val topic = new CreatableTopic()
+    topic.setName(name)
+    if (numPartitions != null) {
+      topic.setNumPartitions(numPartitions)
+    } else if (assignment != null) {
+      topic.setNumPartitions(-1)
+    } else {
+      topic.setNumPartitions(1)
+    }
+    if (replicationFactor != null) {
+      topic.setReplicationFactor(replicationFactor.toShort)
+    } else if (assignment != null) {
+      topic.setReplicationFactor((-1).toShort)
+    } else {
+      topic.setReplicationFactor(1.toShort)
+    }
+    if (config != null) {
+      val effectiveConfigs = new CreateableTopicConfigSet()
+      config.foreach {
+        case (name, value) => {
+          effectiveConfigs.add(new CreateableTopicConfig().setName(name).setValue(value))
+        }
+      }
+      topic.setConfigs(effectiveConfigs)
+    }
+    if (assignment != null) {
+      val effectiveAssignments = new CreatableReplicaAssignmentSet()
+      assignment.foreach {
+        case (partitionIndex, brokerIdList) => {
+          val effectiveAssignment = new CreatableReplicaAssignment()
+          effectiveAssignment.setPartitionIndex(partitionIndex)
+          val brokerIds = new util.ArrayList[java.lang.Integer]()
+          brokerIdList.foreach(brokerId => brokerIds.add(brokerId))
+          effectiveAssignment.setBrokerIds(brokerIds)
+          effectiveAssignments.add(effectiveAssignment)
+        }
+      }
+      topic.setAssignments(effectiveAssignments)
+    }
+    topic
+  }
+
   protected def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
     val response = sendCreateTopicRequest(request)
 
-    val error = response.errors.values.asScala.find(_.isFailure)
-    assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
-
-    request.topics.asScala.foreach { case (topic, details) =>
+    assertTrue(s"There should be no errors, found " +
+      s"${response.errorCounts().keySet().asScala.mkString(", ")},",
+      response.errorCounts().keySet().asScala.find(_.code() > 0).isEmpty)
 
+    request.data().topics().asScala.foreach { case topic =>
       def verifyMetadata(socketServer: SocketServer) = {
         val metadata = sendMetadataRequest(
-          new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
-        val metadataForTopic = metadata.filter(_.topic == topic).head
+          new MetadataRequest.Builder(List(topic.name()).asJava, true).build()).topicMetadata.asScala
+        val metadataForTopic = metadata.filter(_.topic == topic.name()).head
 
-        val partitions = if (!details.replicasAssignments.isEmpty)
-          details.replicasAssignments.size
+        val partitions = if (!topic.assignments().isEmpty)
+          topic.assignments().size
         else
-          details.numPartitions
+          topic.numPartitions
 
-        val replication = if (!details.replicasAssignments.isEmpty)
-          details.replicasAssignments.asScala.head._2.size
+        val replication = if (!topic.assignments().isEmpty)
+          topic.assignments().iterator().next().brokerIds().size()
         else
-          details.replicationFactor
+          topic.replicationFactor
 
-        if (request.validateOnly) {
+        if (request.data.validateOnly) {
           assertNotNull(s"Topic $topic should be created", metadataForTopic)
           assertFalse(s"Error ${metadataForTopic.error} for topic $topic", metadataForTopic.error == Errors.NONE)
           assertTrue("The topic should have no partitions", metadataForTopic.partitionMetadata.isEmpty)
@@ -71,9 +132,9 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
 
       // Verify controller broker has the correct metadata
       verifyMetadata(controllerSocketServer)
-      if (!request.validateOnly) {
+      if (!request.data.validateOnly) {
         // Wait until metadata is propagated and validate non-controller broker has the correct metadata
-        TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+        TestUtils.waitUntilMetadataIsPropagated(servers, topic.name(), 0)
       }
       verifyMetadata(notControllerSocketServer)
     }
@@ -106,20 +167,21 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
                                                   requestStruct: Option[Struct] = None): Unit = {
     val response = requestStruct.map(sendCreateTopicRequestStruct(_, request.version)).getOrElse(
       sendCreateTopicRequest(request))
-    val errors = response.errors.asScala
-    assertEquals("The response size should match", expectedResponse.size, response.errors.size)
+    assertEquals("The response size should match", expectedResponse.size, response.data().topics().size)
 
-    expectedResponse.foreach { case (topic, expectedError) =>
-      val expected = expectedResponse(topic)
-      val actual = errors(topic)
-      assertEquals("The response error should match", expected.error, actual.error)
+    expectedResponse.foreach { case (topicName, expectedError) =>
+      val expected = expectedResponse(topicName)
+      val actual = response.data().topics().find(topicName)
+      if (actual == null) {
+        throw new RuntimeException(s"No response data found for topic ${topicName}")
+      }
+      assertEquals("The response error should match", expected.error.code(), actual.errorCode())
       if (checkErrorMessage) {
-        assertEquals(expected.message, actual.message)
-        assertEquals(expected.messageWithFallback, actual.messageWithFallback)
+        assertEquals(expected.message, actual.errorMessage())
       }
       // If no error validate topic exists
-      if (expectedError.isSuccess && !request.validateOnly) {
-        validateTopicExists(topic)
+      if (expectedError.isSuccess && !request.data.validateOnly) {
+        validateTopicExists(topicName)
       }
     }
   }
@@ -131,10 +193,6 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
     assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE))
   }
 
-  protected def replicaAssignmentToJava(assignments: Map[Int, List[Int]]) = {
-    assignments.map { case (k, v) => (k: Integer, v.map { i => i: Integer }.asJava) }.asJava
-  }
-
   protected def sendCreateTopicRequestStruct(requestStruct: Struct, apiVersion: Short,
                                              socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {
     val response = connectAndSendStruct(requestStruct, ApiKeys.CREATE_TOPICS, apiVersion, socketServer)
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 47c1765..db2028c 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -19,76 +19,61 @@ package kafka.server
 
 import kafka.utils._
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.CreateTopicsRequest
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConverters._
-
 class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
-
   @Test
   def testValidCreateTopicsRequests() {
-    val timeout = 10000
     // Generated assignments
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build())
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic2" -> new CreateTopicsRequest.TopicDetails(1, 3.toShort)).asJava, timeout).build())
-    val config3 = Map("min.insync.replicas" -> "2").asJava
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic3" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort, config3)).asJava, timeout).build())
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1"))))
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic2", replicationFactor = 3))))
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic3",
+      numPartitions = 5, replicationFactor = 2, config = Map("min.insync.replicas" -> "2")))))
     // Manual assignments
-    val assignments4 = replicaAssignmentToJava(Map(0 -> List(0)))
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic4" -> new CreateTopicsRequest.TopicDetails(assignments4)).asJava, timeout).build())
-    val assignments5 = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))
-    val config5 = Map("min.insync.replicas" -> "2").asJava
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic5" -> new CreateTopicsRequest.TopicDetails(assignments5, config5)).asJava, timeout).build())
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic4", assignment = Map(0 -> List(0))))))
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic5",
+      assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)),
+      config = Map("min.insync.replicas" -> "2")))))
     // Mixed
-    val assignments8 = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(
-      "topic6" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
-      "topic7" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort),
-      "topic8" -> new CreateTopicsRequest.TopicDetails(assignments8)).asJava, timeout).build()
-    )
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(
-      "topic9" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
-      "topic10" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort),
-      "topic11" -> new CreateTopicsRequest.TopicDetails(assignments8)).asJava, timeout, true).build()
-    )
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic6"),
+      topicReq("topic7", numPartitions = 5, replicationFactor = 2),
+      topicReq("topic8", assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2))))))
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic9"),
+      topicReq("topic10", numPartitions = 5, replicationFactor = 2),
+      topicReq("topic11", assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))),
+      validateOnly = true))
   }
 
   @Test
   def testErrorCreateTopicsRequests() {
-    val timeout = 10000
     val existingTopic = "existing-topic"
     createTopic(existingTopic, 1, 1)
-
     // Basic
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq(existingTopic))),
       Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("Topic 'existing-topic' already exists."))))
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-partitions" -> new CreateTopicsRequest.TopicDetails(-1, 1.toShort)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -1))),
       Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false)
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-replication" -> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
+      replicationFactor = numBrokers + 1))),
       Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)), checkErrorMessage = false)
-    val invalidConfig = Map("not.a.property" -> "error").asJava
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config",
+      config=Map("not.a.property" -> "error")))),
       Map("error-config" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
-
-    val config = Map("message.format.version" -> "invalid-value").asJava
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("error-config-value" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, config)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config-value",
+      config=Map("message.format.version" -> "invalid-value")))),
       Map("error-config-value" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
-
-    val invalidAssignments = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(0)))
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-assignment",
+      assignment=Map(0 -> List(0, 1), 1 -> List(0))))),
       Map("error-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT)), checkErrorMessage = false)
 
     // Partial
-    validateErrorCreateTopicsRequests(
-      new CreateTopicsRequest.Builder(Map(
-        existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
-        "partial-partitions" -> new CreateTopicsRequest.TopicDetails(-1, 1.toShort),
-        "partial-replication" -> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort),
-        "partial-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments),
-        "partial-none" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(
+      topicReq(existingTopic),
+      topicReq("partial-partitions", numPartitions = -1),
+      topicReq("partial-replication", replicationFactor=numBrokers + 1),
+      topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 -> List(0))),
+      topicReq("partial-none"))),
       Map(
         existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS),
         "partial-partitions" -> error(Errors.INVALID_PARTITIONS),
@@ -101,12 +86,15 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
 
     // Timeout
     // We don't expect a request to ever complete within 1ms. A timeout of 1 ms allows us to test the purgatory timeout logic.
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 1).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(
+      topicReq("error-timeout", numPartitions = 10, replicationFactor = 3)), timeout = 1),
       Map("error-timeout" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout-zero" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 0).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(
+      topicReq("error-timeout-zero", numPartitions = 10, replicationFactor = 3)), timeout = 0),
       Map("error-timeout-zero" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
     // Negative timeouts are treated the same as 0
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout-negative" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, -1).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(
+      topicReq("error-timeout-negative", numPartitions = 10, replicationFactor = 3)), timeout = -1),
       Map("error-timeout-negative" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
     // The topics should still get created eventually
     TestUtils.waitUntilMetadataIsPropagated(servers, "error-timeout", 0)
@@ -119,54 +107,22 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
 
   @Test
   def testInvalidCreateTopicsRequests() {
-    // Duplicate
-    val singleRequest = new CreateTopicsRequest.Builder(Map("duplicate-topic" ->
-        new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
-    validateErrorCreateTopicsRequests(singleRequest, Map("duplicate-topic" -> error(Errors.INVALID_REQUEST,
-      Some("""Create topics request from client `client-id` contains multiple entries for the following topics: duplicate-topic"""))),
-      requestStruct = Some(toStructWithDuplicateFirstTopic(singleRequest)))
-
-    // Duplicate Partial with validateOnly
-    val doubleRequestValidateOnly = new CreateTopicsRequest.Builder(Map(
-      "duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
-      "other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000, true).build()
-    validateErrorCreateTopicsRequests(doubleRequestValidateOnly, Map(
-      "duplicate-topic" -> error(Errors.INVALID_REQUEST),
-      "other-topic" -> error(Errors.NONE)), checkErrorMessage = false,
-      requestStruct = Some(toStructWithDuplicateFirstTopic(doubleRequestValidateOnly)))
-
-    // Duplicate Partial
-    val doubleRequest = new CreateTopicsRequest.Builder(Map(
-      "duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
-      "other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
-    validateErrorCreateTopicsRequests(doubleRequest, Map(
-      "duplicate-topic" -> error(Errors.INVALID_REQUEST),
-      "other-topic" -> error(Errors.NONE)), checkErrorMessage = false,
-      requestStruct = Some(toStructWithDuplicateFirstTopic(doubleRequest)))
-
     // Partitions/ReplicationFactor and ReplicaAssignment
-    val assignments = replicaAssignmentToJava(Map(0 -> List(0)))
-    val assignmentRequest = new CreateTopicsRequest.Builder(Map("bad-args-topic" ->
-        new CreateTopicsRequest.TopicDetails(assignments)).asJava, 1000).build()
-    val badArgumentsRequest = addPartitionsAndReplicationFactorToFirstTopic(assignmentRequest)
-    validateErrorCreateTopicsRequests(badArgumentsRequest, Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)),
-      checkErrorMessage = false)
-
-    // Partitions/ReplicationFactor and ReplicaAssignment with validateOnly
-    val assignmentRequestValidateOnly = new CreateTopicsRequest.Builder(Map("bad-args-topic" ->
-      new CreateTopicsRequest.TopicDetails(assignments)).asJava, 1000, true).build()
-    val badArgumentsRequestValidateOnly = addPartitionsAndReplicationFactorToFirstTopic(assignmentRequestValidateOnly)
-    validateErrorCreateTopicsRequests(badArgumentsRequestValidateOnly, Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)),
-      checkErrorMessage = false)
+    validateErrorCreateTopicsRequests(topicsReq(Seq(
+      topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
+        assignment = Map(0 -> List(0))))),
+      Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)), checkErrorMessage = false)
+
+    validateErrorCreateTopicsRequests(topicsReq(Seq(
+      topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
+        assignment = Map(0 -> List(0)))), validateOnly = true),
+      Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)), checkErrorMessage = false)
   }
 
   @Test
   def testNotController() {
-    val request = new CreateTopicsRequest.Builder(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
-    val response = sendCreateTopicRequest(request, notControllerSocketServer)
-
-    val error = response.errors.asScala.head._2.error
-    assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER, error)
+    val req = topicsReq(Seq(topicReq("topic1")))
+    val response = sendCreateTopicRequest(req, notControllerSocketServer)
+    assertEquals(1, response.errorCounts().get(Errors.NOT_CONTROLLER))
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 42e9ff8..4fc3244 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -23,7 +23,6 @@ import java.util.Properties
 import kafka.log.LogConfig
 import org.apache.kafka.common.errors.PolicyViolationException
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.CreateTopicsRequest
 import org.apache.kafka.server.policy.CreateTopicPolicy
 import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
 import org.junit.Test
@@ -40,66 +39,69 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
 
   @Test
   def testValidCreateTopicsRequests() {
-    val timeout = 10000
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1",
+      numPartitions = 5))))
 
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("topic1" -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava, timeout).build())
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic2",
+      numPartitions = 5, replicationFactor = 3)),
+      validateOnly = true))
 
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("topic2" -> new CreateTopicsRequest.TopicDetails(5, 3.toShort)).asJava, timeout, true).build())
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic3",
+      numPartitions = 11, replicationFactor = 2,
+      config = Map(LogConfig.RetentionMsProp -> 4999.toString))),
+      validateOnly = true))
 
-    val configs = Map(LogConfig.RetentionMsProp -> 4999.toString)
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("topic3" -> new CreateTopicsRequest.TopicDetails(11, 2.toShort, configs.asJava)).asJava, timeout, true).build())
-
-    val assignments = replicaAssignmentToJava(Map(0 -> List(1, 0), 1 -> List(0, 1)))
-    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("topic4" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build())
+    validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic4",
+      assignment = Map(0 -> List(1, 0), 1 -> List(0, 1))))))
   }
 
   @Test
   def testErrorCreateTopicsRequests() {
-    val timeout = 10000
     val existingTopic = "existing-topic"
     createTopic(existingTopic, 1, 1)
 
     // Policy violations
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("policy-topic1" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic1",
+      numPartitions = 4, replicationFactor = 1))),
       Map("policy-topic1" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4"))))
 
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("policy-topic2" -> new CreateTopicsRequest.TopicDetails(4, 3.toShort)).asJava, timeout, true).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic2",
+      numPartitions = 4, replicationFactor = 3)), validateOnly = true),
       Map("policy-topic2" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4"))))
 
-    val configs = Map(LogConfig.RetentionMsProp -> 5001.toString)
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("policy-topic3" -> new CreateTopicsRequest.TopicDetails(11, 2.toShort, configs.asJava)).asJava, timeout, true).build(),
-      Map("policy-topic3" -> error(Errors.POLICY_VIOLATION, Some("RetentionMs should be less than 5000ms if replicationFactor > 5"))))
-
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("policy-topic4" -> new CreateTopicsRequest.TopicDetails(11, 3.toShort, Map.empty.asJava)).asJava, timeout, true).build(),
-      Map("policy-topic4" -> error(Errors.POLICY_VIOLATION, Some("RetentionMs should be less than 5000ms if replicationFactor > 5"))))
-
-    val assignments = replicaAssignmentToJava(Map(0 -> List(1), 1 -> List(0)))
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("policy-topic5" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic3",
+      numPartitions = 11, replicationFactor = 2,
+      config = Map(LogConfig.RetentionMsProp -> 5001.toString))), validateOnly = true),
+      Map("policy-topic3" -> error(Errors.POLICY_VIOLATION,
+        Some("RetentionMs should be less than 5000ms if replicationFactor > 5"))))
+
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic4",
+      numPartitions = 11, replicationFactor = 3,
+      config = Map(LogConfig.RetentionMsProp -> 5001.toString))), validateOnly = true),
+      Map("policy-topic4" -> error(Errors.POLICY_VIOLATION,
+        Some("RetentionMs should be less than 5000ms if replicationFactor > 5"))))
+
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic5",
+      assignment = Map(0 -> List(1), 1 -> List(0)),
+      config = Map(LogConfig.RetentionMsProp -> 5001.toString))), validateOnly = true),
       Map("policy-topic5" -> error(Errors.POLICY_VIOLATION,
         Some("Topic partitions should have at least 2 partitions, received 1 for partition 0"))))
 
     // Check that basic errors still work
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map(existingTopic -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava, timeout).build(),
-      Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("Topic 'existing-topic' already exists."))))
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq(existingTopic,
+      numPartitions = 5, replicationFactor = 1))),
+      Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS,
+        Some("Topic 'existing-topic' already exists."))))
 
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("error-replication" -> new CreateTopicsRequest.TopicDetails(10, (numBrokers + 1).toShort)).asJava, timeout, true).build(),
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
+      numPartitions = 10, replicationFactor = numBrokers + 1)), validateOnly = true),
       Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
         Some("Replication factor: 4 larger than available brokers: 3."))))
 
-    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("error-replication2" -> new CreateTopicsRequest.TopicDetails(10, -1: Short)).asJava, timeout, true).build(),
-      Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("Replication factor must be larger than 0."))))
+    validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2",
+      numPartitions = 10, replicationFactor = -1)), validateOnly = true),
+      Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR,
+        Some("Replication factor must be larger than 0."))))
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ecfb5f5..b5531fe 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -27,6 +27,8 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.{ElectPreferredLeadersRequestData, LeaveGroupRequestData}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
@@ -279,8 +281,13 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.API_VERSIONS =>
           new ApiVersionsRequest.Builder
 
-        case ApiKeys.CREATE_TOPICS =>
-          new CreateTopicsRequest.Builder(Map("topic-2" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 0)
+        case ApiKeys.CREATE_TOPICS => {
+          new CreateTopicsRequest.Builder(
+            new CreateTopicsRequestData().setTopics(
+              new CreatableTopicSet(Collections.singleton(
+                new CreatableTopic().setName("topic-2").setNumPartitions(1).
+                  setReplicationFactor(1.toShort)).iterator())))
+        }
 
         case ApiKeys.DELETE_TOPICS =>
           new DeleteTopicsRequest.Builder(Set("topic-2").asJava, 5000)
@@ -438,7 +445,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.DESCRIBE_GROUPS => new DescribeGroupsResponse(response).throttleTimeMs
       case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
       case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
-      case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response).throttleTimeMs
+      case ApiKeys.CREATE_TOPICS =>
+        new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs
       case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs
       case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
       case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs


Mime
View raw message