kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/2] kafka git commit: KAFKA-2945; CreateTopic - protocol and server side implementation
Date Tue, 12 Jul 2016 15:21:25 GMT
KAFKA-2945; CreateTopic - protocol and server side implementation

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1489 from granthenke/create-wire-new


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc47b9fa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc47b9fa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc47b9fa

Branch: refs/heads/trunk
Commit: fc47b9fa6b89a612cd0f79dfc689ee3813fd405a
Parents: 136a8fa
Author: Grant Henke <granthenke@gmail.com>
Authored: Tue Jul 12 08:21:19 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jul 12 08:21:19 2016 -0700

----------------------------------------------------------------------
 .../errors/InvalidConfigurationException.java   |  31 +++
 .../errors/InvalidPartitionsException.java      |  31 +++
 .../InvalidReplicaAssignmentException.java      |  31 +++
 .../InvalidReplicationFactorException.java      |  31 +++
 .../common/errors/InvalidRequestException.java  |  36 +++
 .../common/errors/NotControllerException.java   |  28 +++
 .../common/errors/TopicExistsException.java     |  31 +++
 .../apache/kafka/common/protocol/ApiKeys.java   |   5 +-
 .../apache/kafka/common/protocol/Errors.java    |  24 +-
 .../apache/kafka/common/protocol/Protocol.java  |  46 ++++
 .../kafka/common/requests/AbstractRequest.java  |   4 +-
 .../common/requests/CreateTopicsRequest.java    | 231 ++++++++++++++++++
 .../common/requests/CreateTopicsResponse.java   |  96 ++++++++
 .../common/requests/RequestResponseTest.java    |  30 ++-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  19 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   6 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |   3 -
 .../main/scala/kafka/common/ErrorMapping.scala  |   9 +
 .../kafka/common/InvalidTopicException.scala    |  22 --
 .../kafka/common/TopicExistsException.scala     |  22 --
 core/src/main/scala/kafka/log/LogConfig.scala   |   7 +-
 .../kafka/network/InvalidRequestException.scala |  24 --
 .../scala/kafka/network/RequestChannel.scala    |   5 +-
 .../main/scala/kafka/network/SocketServer.scala |   1 +
 .../main/scala/kafka/server/AdminManager.scala  | 114 +++++++++
 .../kafka/server/DelayedCreateTopics.scala      |  91 ++++++++
 .../kafka/server/DelayedOperationKey.scala      |   6 +
 .../src/main/scala/kafka/server/KafkaApis.scala |  53 ++++-
 .../main/scala/kafka/server/KafkaServer.scala   |   5 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  22 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  12 +-
 .../unit/kafka/admin/TopicCommandTest.scala     |   4 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |   4 +-
 .../unit/kafka/server/BaseRequestTest.scala     |  46 +++-
 .../kafka/server/CreateTopicsRequestTest.scala  | 233 +++++++++++++++++++
 .../unit/kafka/server/MetadataRequestTest.scala |   2 +-
 .../unit/kafka/server/ProduceRequestTest.scala  |   5 +-
 .../server/SaslApiVersionsRequestTest.scala     |   4 +-
 38 files changed, 1250 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/errors/InvalidConfigurationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidConfigurationException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidConfigurationException.java
new file mode 100644
index 0000000..25cbc7a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidConfigurationException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidConfigurationException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidConfigurationException(String message) {
+        super(message);
+    }
+
+    public InvalidConfigurationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/errors/InvalidPartitionsException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidPartitionsException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPartitionsException.java
new file mode 100644
index 0000000..c7ea668
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPartitionsException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidPartitionsException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidPartitionsException(String message) {
+        super(message);
+    }
+
+    public InvalidPartitionsException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/errors/InvalidReplicaAssignmentException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidReplicaAssignmentException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidReplicaAssignmentException.java
new file mode 100644
index 0000000..765f0f6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidReplicaAssignmentException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidReplicaAssignmentException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidReplicaAssignmentException(String message) {
+        super(message);
+    }
+
+    public InvalidReplicaAssignmentException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/errors/InvalidReplicationFactorException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidReplicationFactorException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidReplicationFactorException.java
new file mode 100644
index 0000000..33f048a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidReplicationFactorException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidReplicationFactorException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidReplicationFactorException(String message) {
+        super(message);
+    }
+
+    public InvalidReplicationFactorException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequestException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequestException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequestException.java
new file mode 100644
index 0000000..8299da4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequestException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when a request breaks basic wire protocol rules.
+ * This most likely occurs because of a request being malformed by the client library or
+ * the message was sent to an incompatible broker.
+ */
+public class InvalidRequestException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidRequestException(String message) {
+        super(message);
+    }
+
+    public InvalidRequestException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/errors/NotControllerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotControllerException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotControllerException.java
new file mode 100644
index 0000000..c2784a8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotControllerException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class NotControllerException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NotControllerException(String message) {
+        super(message);
+    }
+
+    public NotControllerException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/errors/TopicExistsException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicExistsException.java b/clients/src/main/java/org/apache/kafka/common/errors/TopicExistsException.java
new file mode 100644
index 0000000..0fc0683
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicExistsException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class TopicExistsException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TopicExistsException(String message) {
+        super(message);
+    }
+
+    public TopicExistsException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index aeb0b45..bd00b97 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -38,7 +38,8 @@ public enum ApiKeys {
     DESCRIBE_GROUPS(15, "DescribeGroups"),
     LIST_GROUPS(16, "ListGroups"),
     SASL_HANDSHAKE(17, "SaslHandshake"),
-    API_VERSIONS(18, "ApiVersions");
+    API_VERSIONS(18, "ApiVersions"),
+    CREATE_TOPICS(19, "CreateTopics");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
@@ -98,4 +99,4 @@ public enum ApiKeys {
         System.out.println(toHtml());
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bd7310b..90aa8f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -31,14 +31,20 @@ import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
 import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidFetchSizeException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidRequiredAcksException;
 import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
 import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
 import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
@@ -50,6 +56,7 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -139,7 +146,22 @@ public enum Errors {
     ILLEGAL_SASL_STATE(34,
             new IllegalSaslStateException("Request is not valid given the current SASL state.")),
     UNSUPPORTED_VERSION(35,
-            new UnsupportedVersionException("The version of API is not supported."));
+            new UnsupportedVersionException("The version of API is not supported.")),
+    TOPIC_ALREADY_EXISTS(36,
+            new TopicExistsException("Topic with this name already exists.")),
+    INVALID_PARTITIONS(37,
+            new InvalidPartitionsException("Number of partitions is invalid.")),
+    INVALID_REPLICATION_FACTOR(38,
+            new InvalidReplicationFactorException("Replication-factor is invalid.")),
+    INVALID_REPLICA_ASSIGNMENT(39,
+            new InvalidReplicaAssignmentException("Replica assignment is invalid.")),
+    INVALID_CONFIG(40,
+            new InvalidConfigurationException("Configuration is invalid.")),
+    NOT_CONTROLLER(41,
+        new NotControllerException("This is not the correct controller for this cluster.")),
+    INVALID_REQUEST(42,
+        new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" +
+            " the message was sent to an incompatible broker. See the broker logs for more details."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index ec74427..2610e04 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -770,6 +770,50 @@ public class Protocol {
     public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0};
     public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0};
 
+    /* Admin requests common */
+    public static final Schema CONFIG_ENTRY = new Schema(new Field("config_key", STRING, "Configuration key name"),
+        new Field("config_value", STRING, "Configuration value"));
+
+    public static final Schema PARTITION_REPLICA_ASSIGNMENT_ENTRY = new Schema(
+        new Field("partition_id", INT32),
+        new Field("replicas", new ArrayOf(INT32), "The set of all nodes that should host this partition. The first replica in the list is the preferred leader."));
+
+    public static final Schema TOPIC_ERROR_CODE = new Schema(new Field("topic", STRING), new Field("error_code", INT16));
+
+    /* CreateTopic api */
+    public static final Schema SINGLE_CREATE_TOPIC_REQUEST_V0 = new Schema(
+        new Field("topic",
+            STRING,
+            "Name for newly created topic."),
+        new Field("num_partitions",
+            INT32,
+            "Number of partitions to be created. -1 indicates unset."),
+        new Field("replication_factor",
+            INT16,
+            "Replication factor for the topic. -1 indicates unset."),
+        new Field("replica_assignment",
+            new ArrayOf(PARTITION_REPLICA_ASSIGNMENT_ENTRY),
+            "Replica assignment among kafka brokers for this topic partitions. If this is set num_partitions and replication_factor must be unset."),
+        new Field("configs",
+            new ArrayOf(CONFIG_ENTRY),
+            "Topic level configuration for topic to be set."));
+
+    public static final Schema CREATE_TOPICS_REQUEST_V0 = new Schema(
+        new Field("create_topic_requests",
+            new ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V0),
+            "An array of single topic creation requests. Can not have multiple entries for the same topic."),
+        new Field("timeout",
+            INT32,
+            "The time in ms to wait for a topic to be completely created on the controller node. Values <= 0 will trigger topic creation and return immediately"));
+
+    public static final Schema CREATE_TOPICS_RESPONSE_V0 = new Schema(
+        new Field("topic_error_codes",
+            new ArrayOf(TOPIC_ERROR_CODE),
+            "An array of per topic error codes."));
+
+    public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0};
+    public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -799,6 +843,7 @@ public class Protocol {
         REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
         REQUESTS[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_REQUEST;
         REQUESTS[ApiKeys.API_VERSIONS.id] = API_VERSIONS_REQUEST;
+        REQUESTS[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -819,6 +864,7 @@ public class Protocol {
         RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
         RESPONSES[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_RESPONSE;
         RESPONSES[ApiKeys.API_VERSIONS.id] = API_VERSIONS_RESPONSE;
+        RESPONSES[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index ab61c66..6a91825 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -76,9 +76,11 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return SaslHandshakeRequest.parse(buffer, versionId);
             case API_VERSIONS:
                 return ApiVersionsRequest.parse(buffer, versionId);
+            case CREATE_TOPICS:
+                return CreateTopicsRequest.parse(buffer, versionId);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
new file mode 100644
index 0000000..3977835
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CreateTopicsRequest extends AbstractRequest {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CREATE_TOPICS.id);
+
+    private static final String REQUESTS_KEY_NAME = "create_topic_requests";
+
+    private static final String TIMEOUT_KEY_NAME = "timeout";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String NUM_PARTITIONS_KEY_NAME = "num_partitions";
+    private static final String REPLICATION_FACTOR_KEY_NAME = "replication_factor";
+    private static final String REPLICA_ASSIGNMENT_KEY_NAME = "replica_assignment";
+    private static final String REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME = "partition_id";
+    private static final String REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME = "replicas";
+
+    private static final String CONFIG_KEY_KEY_NAME = "config_key";
+    private static final String CONFIG_VALUE_KEY_NAME = "config_value";
+    private static final String CONFIGS_KEY_NAME = "configs";
+
+    public static final class TopicDetails {
+        public final int numPartitions;
+        public final short replicationFactor;
+        public final Map<Integer, List<Integer>> replicasAssignments;
+        public final Map<String, String> configs;
+
+        private TopicDetails(int numPartitions,
+                             short replicationFactor,
+                             Map<Integer, List<Integer>> replicasAssignments,
+                             Map<String, String> configs) {
+            this.numPartitions = numPartitions;
+            this.replicationFactor = replicationFactor;
+            this.replicasAssignments = replicasAssignments;
+            this.configs = configs;
+        }
+
+        public TopicDetails(int partitions,
+                            short replicationFactor,
+                            Map<String, String> configs) {
+            this(partitions, replicationFactor, Collections.<Integer, List<Integer>>emptyMap(), configs);
+        }
+
+        public TopicDetails(int partitions,
+                            short replicationFactor) {
+            this(partitions, replicationFactor, Collections.<String, String>emptyMap());
+        }
+
+        public TopicDetails(Map<Integer, List<Integer>> replicasAssignments,
+                            Map<String, String> configs) {
+            this(NO_NUM_PARTITIONS, NO_REPLICATION_FACTOR, replicasAssignments, configs);
+        }
+
+        public TopicDetails(Map<Integer, List<Integer>> replicasAssignments) {
+            this(replicasAssignments, Collections.<String, String>emptyMap());
+        }
+    }
+
+    private final Map<String, TopicDetails> topics;
+    private final Integer timeout;
+
+    // Set to handle special case where 2 requests for the same topic exist on the wire.
+    // This allows the broker to return an error code for these topics.
+    private final Set<String> duplicateTopics;
+
+    public static final int NO_NUM_PARTITIONS = -1;
+    public static final short NO_REPLICATION_FACTOR = -1;
+
+    public CreateTopicsRequest(Map<String, TopicDetails> topics, Integer timeout) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> createTopicRequestStructs = new ArrayList<>(topics.size());
+        for (Map.Entry<String, TopicDetails> entry : topics.entrySet()) {
+
+            Struct singleRequestStruct = struct.instance(REQUESTS_KEY_NAME);
+            String topic = entry.getKey();
+            TopicDetails args = entry.getValue();
+
+            singleRequestStruct.set(TOPIC_KEY_NAME, topic);
+            singleRequestStruct.set(NUM_PARTITIONS_KEY_NAME, args.numPartitions);
+            singleRequestStruct.set(REPLICATION_FACTOR_KEY_NAME, args.replicationFactor);
+
+            // replica assignment
+            List<Struct> replicaAssignmentsStructs = new ArrayList<>(args.replicasAssignments.size());
+            for (Map.Entry<Integer, List<Integer>> partitionReplicaAssignment : args.replicasAssignments.entrySet()) {
+                Struct replicaAssignmentStruct = singleRequestStruct.instance(REPLICA_ASSIGNMENT_KEY_NAME);
+                replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME, partitionReplicaAssignment.getKey());
+                replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, partitionReplicaAssignment.getValue().toArray());
+                replicaAssignmentsStructs.add(replicaAssignmentStruct);
+            }
+            singleRequestStruct.set(REPLICA_ASSIGNMENT_KEY_NAME, replicaAssignmentsStructs.toArray());
+
+            // configs
+            List<Struct> configsStructs = new ArrayList<>(args.configs.size());
+            for (Map.Entry<String, String> configEntry : args.configs.entrySet()) {
+                Struct configStruct = singleRequestStruct.instance(CONFIGS_KEY_NAME);
+                configStruct.set(CONFIG_KEY_KEY_NAME, configEntry.getKey());
+                configStruct.set(CONFIG_VALUE_KEY_NAME, configEntry.getValue());
+                configsStructs.add(configStruct);
+            }
+            singleRequestStruct.set(CONFIGS_KEY_NAME, configsStructs.toArray());
+            createTopicRequestStructs.add(singleRequestStruct);
+        }
+        struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray());
+        struct.set(TIMEOUT_KEY_NAME, timeout);
+
+        this.topics = topics;
+        this.timeout = timeout;
+        this.duplicateTopics = Collections.emptySet();
+    }
+
+    public CreateTopicsRequest(Struct struct) {
+        super(struct);
+
+        Object[] requestStructs = struct.getArray(REQUESTS_KEY_NAME);
+        Map<String, TopicDetails> topics = new HashMap<>();
+        Set<String> duplicateTopics = new HashSet<>();
+
+        for (Object requestStructObj : requestStructs) {
+            Struct singleRequestStruct = (Struct) requestStructObj;
+            String topic = singleRequestStruct.getString(TOPIC_KEY_NAME);
+
+            if (topics.containsKey(topic))
+                duplicateTopics.add(topic);
+
+            int numPartitions = singleRequestStruct.getInt(NUM_PARTITIONS_KEY_NAME);
+            short replicationFactor = singleRequestStruct.getShort(REPLICATION_FACTOR_KEY_NAME);
+
+            //replica assignment
+            Object[] assignmentsArray = singleRequestStruct.getArray(REPLICA_ASSIGNMENT_KEY_NAME);
+            Map<Integer, List<Integer>> partitionReplicaAssignments = new HashMap<>(assignmentsArray.length);
+            for (Object assignmentStructObj : assignmentsArray) {
+                Struct assignmentStruct = (Struct) assignmentStructObj;
+
+                Integer partitionId = assignmentStruct.getInt(REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME);
+
+                Object[] replicasArray = assignmentStruct.getArray(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME);
+                List<Integer> replicas = new ArrayList<>(replicasArray.length);
+                for (Object replica : replicasArray) {
+                    replicas.add((Integer) replica);
+                }
+
+                partitionReplicaAssignments.put(partitionId, replicas);
+            }
+
+            Object[] configArray = singleRequestStruct.getArray(CONFIGS_KEY_NAME);
+            Map<String, String> configs = new HashMap<>(configArray.length);
+            for (Object configStructObj : configArray) {
+                Struct configStruct = (Struct) configStructObj;
+
+                String key = configStruct.getString(CONFIG_KEY_KEY_NAME);
+                String value = configStruct.getString(CONFIG_VALUE_KEY_NAME);
+
+                configs.put(key, value);
+            }
+
+            TopicDetails args = new TopicDetails(numPartitions, replicationFactor, partitionReplicaAssignments, configs);
+
+            topics.put(topic, args);
+        }
+
+        this.topics = topics;
+        this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
+        this.duplicateTopics = duplicateTopics;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Map<String, Errors> topicErrors = new HashMap<>();
+        for (String topic : topics.keySet()) {
+            topicErrors.put(topic, Errors.forException(e));
+        }
+
+        switch (versionId) {
+            case 0:
+                return new CreateTopicsResponse(topicErrors);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CREATE_TOPICS.id)));
+        }
+    }
+
+    public Map<String, TopicDetails> topics() {
+        return this.topics;
+    }
+
+    public Integer timeout() {
+        return this.timeout;
+    }
+
+    public Set<String> duplicateTopics() {
+        return this.duplicateTopics;
+    }
+
+    public static CreateTopicsRequest parse(ByteBuffer buffer, int versionId) {
+        return new CreateTopicsRequest(ProtoUtils.parseRequest(ApiKeys.CREATE_TOPICS.id, versionId, buffer));
+    }
+
+    public static CreateTopicsRequest parse(ByteBuffer buffer) {
+        return new CreateTopicsRequest(CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
new file mode 100644
index 0000000..1e6d11e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CreateTopicsResponse extends AbstractRequestResponse {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CREATE_TOPICS.id);
+
+    private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error codes:
+     *
+     * INVALID_TOPIC_EXCEPTION(17)
+     * CLUSTER_AUTHORIZATION_FAILED(31)
+     * TOPIC_ALREADY_EXISTS(36)
+     * INVALID_PARTITIONS(37)
+     * INVALID_REPLICATION_FACTOR(38)
+     * INVALID_REPLICA_ASSIGNMENT(39)
+     * INVALID_CONFIG(40)
+     * NOT_CONTROLLER(41)
+     * INVALID_REQUEST(42)
+     */
+
+    private final Map<String, Errors> errors;
+
+    public CreateTopicsResponse(Map<String, Errors> errors) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
+        for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
+            Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
+            topicErrorCodeStruct.set(TOPIC_KEY_NAME, topicError.getKey());
+            topicErrorCodeStruct.set(ERROR_CODE_KEY_NAME, topicError.getValue().code());
+            topicErrorCodeStructs.add(topicErrorCodeStruct);
+        }
+        struct.set(TOPIC_ERROR_CODES_KEY_NAME, topicErrorCodeStructs.toArray());
+
+        this.errors = errors;
+    }
+
+    public CreateTopicsResponse(Struct struct) {
+        super(struct);
+
+        Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
+        Map<String, Errors> errors = new HashMap<>();
+        for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
+            Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj;
+            String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME);
+            short errorCode = topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME);
+            errors.put(topic, Errors.forCode(errorCode));
+        }
+
+        this.errors = errors;
+    }
+
+    public Map<String, Errors> errors() {
+        return errors;
+    }
+
+    public static CreateTopicsResponse parse(ByteBuffer buffer) {
+        return new CreateTopicsResponse(CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static CreateTopicsResponse parse(ByteBuffer buffer, int version) {
+        return new CreateTopicsResponse(ProtoUtils.responseSchema(ApiKeys.CREATE_TOPICS.id, version).read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 043582d..2f53a3c 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -99,7 +99,10 @@ public class RequestResponseTest {
                 createSaslHandshakeResponse(),
                 createApiVersionRequest(),
                 createApiVersionRequest().getErrorResponse(0, new UnknownServerException()),
-                createApiVersionResponse()
+                createApiVersionResponse(),
+                createCreateTopicRequest(),
+                createCreateTopicRequest().getErrorResponse(0, new UnknownServerException()),
+                createCreateTopicResponse()
         );
 
         for (AbstractRequestResponse req : requestResponseList)
@@ -451,4 +454,29 @@ public class RequestResponseTest {
         List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
         return new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
     }
+
+    private AbstractRequest createCreateTopicRequest() {
+        CreateTopicsRequest.TopicDetails request1 = new CreateTopicsRequest.TopicDetails(3, (short) 5);
+
+        Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
+        replicaAssignments.put(1, Arrays.asList(1, 2, 3));
+        replicaAssignments.put(2, Arrays.asList(2, 3, 4));
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put("config1", "value1");
+
+        CreateTopicsRequest.TopicDetails request2 = new CreateTopicsRequest.TopicDetails(replicaAssignments, configs);
+
+        Map<String, CreateTopicsRequest.TopicDetails> request = new HashMap<>();
+        request.put("my_t1", request1);
+        request.put("my_t2", request2);
+        return new CreateTopicsRequest(request, 0);
+    }
+
+    private AbstractRequestResponse createCreateTopicResponse() {
+        Map<String, Errors> errors = new HashMap<>();
+        errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
+        errors.put("t2", Errors.LEADER_NOT_AVAILABLE);
+        return new CreateTopicsResponse(errors);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 53b6dd7..83c2f6c 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -23,11 +23,10 @@ import kafka.log.LogConfig
 import kafka.server.ConfigType
 import kafka.utils._
 import kafka.utils.ZkUtils._
-
 import java.util.Random
 import java.util.Properties
 import org.apache.kafka.common.Node
-import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException}
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.MetadataResponse
 
@@ -110,11 +109,11 @@ object AdminUtils extends Logging {
                               fixedStartIndex: Int = -1,
                               startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
     if (nPartitions <= 0)
-      throw new AdminOperationException("number of partitions must be larger than 0")
+      throw new InvalidPartitionsException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
-      throw new AdminOperationException("replication factor must be larger than 0")
+      throw new InvalidReplicationFactorException("replication factor must be larger than 0")
     if (replicationFactor > brokerMetadatas.size)
-      throw new AdminOperationException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
+      throw new InvalidReplicationFactorException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
     if (brokerMetadatas.forall(_.rack.isEmpty))
       assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
         startPartitionId)
@@ -411,7 +410,6 @@ object AdminUtils extends Logging {
                                                      update: Boolean = false) {
     // validate arguments
     Topic.validate(topic)
-    require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
 
     val topicPath = getTopicPath(topic)
 
@@ -427,7 +425,14 @@ object AdminUtils extends Logging {
       }
     }
 
-    partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: "  + partitionReplicaAssignment))
+    if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
+      throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
+
+    partitionReplicaAssignment.values.foreach(reps =>
+      if (reps.size != reps.toSet.size)
+        throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
+    )
+
 
     // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
     if (!update) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 39bfe62..57a5458 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -19,14 +19,14 @@ package kafka.admin
 
 import java.util.Properties
 import joptsimple._
-import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException}
+import kafka.common.{AdminCommandFailedException, Topic}
 import kafka.consumer.{ConsumerConfig => OldConsumerConfig, Whitelist}
-import kafka.coordinator.GroupCoordinator
 import kafka.log.{Defaults, LogConfig}
 import kafka.server.ConfigType
 import kafka.utils.ZkUtils._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
 import scala.collection.JavaConversions._
@@ -386,7 +386,7 @@ object TopicCommand extends Logging {
       "*****************************************************************************************************\n" +
       "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's    ***\n" +
       "*** default max.message.bytes. This operation is potentially dangerous. Consumers will get        ***\n" +
-      s"*** failures if their fetch.message.max.bytes (old consumer) or ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}         ***\n"+ 
+      s"*** failures if their fetch.message.max.bytes (old consumer) or ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}         ***\n"+
       "*** (new consumer) < the value you are using.                                                     ***\n" +
       "*****************************************************************************************************\n" +
       s"- value set here: $maxMessageBytes\n" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 61290c1..7116722 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -17,9 +17,6 @@
 
 package kafka.cluster
 
-import java.nio.ByteBuffer
-
-import kafka.api.ApiUtils._
 import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
 import kafka.utils.Json
 import org.apache.kafka.common.Node

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 91a1d75..be5fed2 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -20,6 +20,7 @@ package kafka.common
 import java.nio.ByteBuffer
 
 import kafka.message.InvalidMessageException
+import org.apache.kafka.common.errors.InvalidTopicException
 
 import scala.Predef._
 
@@ -65,6 +66,14 @@ object ErrorMapping {
   // 32: INVALID_TIMESTAMP
   // 33: UNSUPPORTED_SASL_MECHANISM
   // 34: ILLEGAL_SASL_STATE
+  // 35: UNSUPPORTED_VERSION
+  // 36: TOPIC_ALREADY_EXISTS
+  // 37: INVALID_PARTITIONS
+  // 38: INVALID_REPLICATION_FACTOR
+  // 39: INVALID_REPLICA_ASSIGNMENT
+  // 40: INVALID_CONFIG
+  // 41: NOT_CONTROLLER
+  // 42: INVALID_REQUEST
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/common/InvalidTopicException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InvalidTopicException.scala b/core/src/main/scala/kafka/common/InvalidTopicException.scala
deleted file mode 100644
index 59f8874..0000000
--- a/core/src/main/scala/kafka/common/InvalidTopicException.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-class InvalidTopicException(message: String) extends RuntimeException(message) {
-  def this() = this(null) 
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/common/TopicExistsException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicExistsException.scala b/core/src/main/scala/kafka/common/TopicExistsException.scala
deleted file mode 100644
index 88a084e..0000000
--- a/core/src/main/scala/kafka/common/TopicExistsException.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-class TopicExistsException(message: String) extends RuntimeException(message) {
-  def this() = this(null) 
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index d5e06fa..31e62b4 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import kafka.api.ApiVersion
 import kafka.message.{BrokerCompressionCodec, Message}
 import kafka.server.KafkaConfig
+import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
@@ -181,7 +182,7 @@ object LogConfig {
   def apply(): LogConfig = LogConfig(new Properties())
 
   def configNames: Seq[String] = configDef.names.asScala.toSeq.sorted
-  
+
   /**
    * Create a log config instance using the given properties and defaults
    */
@@ -197,7 +198,9 @@ object LogConfig {
    */
   def validateNames(props: Properties) {
     val names = configNames
-    for (name <- props.keys.asScala) require(names.contains(name), s"Unknown configuration `$name`.")
+    for(name <- props.asScala.keys)
+      if (!names.contains(name))
+        throw new InvalidConfigurationException(s"Unknown configuration $name.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/network/InvalidRequestException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/InvalidRequestException.scala b/core/src/main/scala/kafka/network/InvalidRequestException.scala
deleted file mode 100644
index 47dba6c..0000000
--- a/core/src/main/scala/kafka/network/InvalidRequestException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-class InvalidRequestException(val message: String, cause: Throwable) extends RuntimeException(message, cause) {
-
-  def this() = this("", null)
-  def this(message: String) = this(message, null)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 53a2346..cff7b1a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -27,9 +27,10 @@ import kafka.api._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{Logging, SystemTime}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol, Protocol}
-import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader, ApiVersionsRequest}
+import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
+import org.apache.kafka.common.requests.{AbstractRequest, ApiVersionsRequest, ProduceRequest, RequestHeader, RequestSend}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.log4j.Logger
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 1ab8149..ad9ac8d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -31,6 +31,7 @@ import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils._
+import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selectable, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
new file mode 100644
index 0000000..fc3a7f0
--- /dev/null
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -0,0 +1,114 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.log.LogConfig
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils._
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.CreateTopicsRequest._
+
+import scala.collection._
+import scala.collection.JavaConverters._
+
+class AdminManager(val config: KafkaConfig,
+                   val metrics: Metrics,
+                   val metadataCache: MetadataCache,
+                   val zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
+  this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
+
+  val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
+
+  def hasDelayedTopicOperations = topicPurgatory.delayed() != 0
+
+  /**
+    * Try to complete delayed topic operations with the request key
+    */
+  def tryCompleteDelayedTopicOperations(topic: String) {
+    val key = TopicKey(topic)
+    val completed = topicPurgatory.checkAndComplete(key)
+    debug(s"Request key ${key.keyLabel} unblocked $completed topic requests.")
+  }
+
+  /**
+    * Create topics and wait until the topics have been completely created.
+    * The callback function will be triggered either when timeout, error or the topics are created.
+    */
+  def createTopics(timeout: Int,
+                   createInfo: Map[String, TopicDetails],
+                   responseCallback: Map[String, Errors] => Unit) {
+
+    // 1. map over topics creating assignment and calling zookeeper
+    val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
+    val metadata = createInfo.map { case (topic, arguments) =>
+      try {
+        val configs = new Properties()
+        arguments.configs.asScala.foreach { case (key, value) =>
+          configs.setProperty(key, value)
+        }
+        LogConfig.validate(configs)
+
+        val assignments = {
+          if ((arguments.numPartitions != NO_NUM_PARTITIONS || arguments.replicationFactor != NO_REPLICATION_FACTOR)
+            && !arguments.replicasAssignments.isEmpty)
+            throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
+              "Both cannot be used at the same time.")
+          else if (!arguments.replicasAssignments.isEmpty) {
+            // Note: we don't check that replicaAssignment doesn't contain unknown brokers - unlike in add-partitions case,
+            // this follows the existing logic in TopicCommand
+            arguments.replicasAssignments.asScala.map { case (partitionId, replicas) =>
+              (partitionId.intValue, replicas.asScala.map(_.intValue))
+            }
+          } else {
+            AdminUtils.assignReplicasToBrokers(brokers, arguments.numPartitions, arguments.replicationFactor)
+          }
+        }
+        trace(s"Assignments for topic $topic are $assignments ")
+        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+        CreateTopicMetadata(topic, assignments, Errors.NONE)
+      } catch {
+        case e: Throwable =>
+          warn(s"Error processing create topic request for topic $topic with arguments $arguments", e)
+          CreateTopicMetadata(topic, Map(), Errors.forException(e))
+      }
+    }
+
+    // 2. if timeout <= 0 or no topics can proceed return immediately
+    if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) {
+      val results = metadata.map { createTopicMetadata =>
+        // ignore topics that already have errors
+        if (createTopicMetadata.error == Errors.NONE) {
+          (createTopicMetadata.topic, Errors.REQUEST_TIMED_OUT)
+        } else {
+          (createTopicMetadata.topic, createTopicMetadata.error)
+        }
+      }.toMap
+      responseCallback(results)
+    } else {
+      // 3. else pass the assignments and errors to the delayed operation and set the keys
+      val delayedCreate = new DelayedCreateTopics(timeout, metadata.toSeq, this, responseCallback)
+      val delayedCreateKeys = createInfo.keys.map(new TopicKey(_)).toSeq
+      // try to complete the request immediately, otherwise put it into the purgatory
+      topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
new file mode 100644
index 0000000..b74b596
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
@@ -0,0 +1,91 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.api.LeaderAndIsr
+import org.apache.kafka.common.protocol.Errors
+
+import scala.collection._
+
+/**
+  * The create metadata maintained by the delayed create operation
+  *
+  * TODO: local state doesn't count, need to know state of all relevant brokers
+  *
+  */
+case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: Errors)
+
+/**
+  * A delayed create topics operation that can be created by the admin manager and watched
+  * in the topic purgatory
+  */
+class DelayedCreateTopics(delayMs: Long,
+                          createMetadata: Seq[CreateTopicMetadata],
+                          adminManager: AdminManager,
+                          responseCallback: Map[String, Errors] => Unit)
+  extends DelayedOperation(delayMs) {
+
+  /**
+    * The operation can be completed if all of the topics that do not have an error exist and every partition has a leader in the controller.
+    * See KafkaController.onNewTopicCreation
+    */
+  override def tryComplete() : Boolean = {
+    trace(s"Trying to complete operation for $createMetadata")
+
+    val leaderlessPartitionCount = createMetadata.filter(_.error == Errors.NONE)
+      .foldLeft(0) { case (topicCounter, metadata) =>
+        topicCounter + missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet)
+      }
+
+    if (leaderlessPartitionCount == 0) {
+      trace("All partitions have a leader, completing the delayed operation")
+      forceComplete()
+    } else {
+      trace(s"$leaderlessPartitionCount partitions do not have a leader, not completing the delayed operation")
+      false
+    }
+  }
+
+  /**
+    * Check for partitions that are still missing a leader, update their error code and call the responseCallback
+    */
+  override def onComplete() {
+    trace(s"Completing operation for $createMetadata")
+    val results = createMetadata.map { metadata =>
+      // ignore topics that already have errors
+      if (metadata.error == Errors.NONE && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
+        (metadata.topic, Errors.REQUEST_TIMED_OUT)
+      else
+        (metadata.topic, metadata.error)
+    }.toMap
+    responseCallback(results)
+  }
+
+  override def onExpiration(): Unit = { }
+
+  private def missingLeaderCount(topic: String, partitions: Set[Int]): Int = {
+    partitions.foldLeft(0) { case (counter, partition) =>
+      if (isMissingLeader(topic, partition)) counter + 1 else counter
+    }
+  }
+
+  private def isMissingLeader(topic: String, partition: Int): Boolean = {
+    val partitionInfo = adminManager.metadataCache.getPartitionInfo(topic, partition)
+    partitionInfo.isEmpty || partitionInfo.get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.NoLeader
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index 072a658..0e05cce 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -52,3 +52,9 @@ case class GroupKey(groupId: String) extends DelayedOperationKey {
 
   override def keyLabel = groupId
 }
+
+/* used by delayed-topic operations */
+case class TopicKey(topic: String) extends DelayedOperationKey {
+
+  override def keyLabel = topic
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ebd1732..5cadb8b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,10 +34,10 @@ import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Topic, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
-import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse}
+import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Node, TopicPartition}
@@ -52,6 +52,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse
  */
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
+                val adminManager: AdminManager,
                 val coordinator: GroupCoordinator,
                 val controller: KafkaController,
                 val zkUtils: ZkUtils,
@@ -92,6 +93,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
+        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -183,6 +185,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     val updateMetadataResponse =
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
         replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
+        if (adminManager.hasDelayedTopicOperations) {
+          updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
+            adminManager.tryCompleteDelayedTopicOperations(topic)
+          }
+        }
         new UpdateMetadataResponse(Errors.NONE.code)
       } else {
         new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
@@ -1043,6 +1050,48 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  def handleCreateTopicsRequest(request: RequestChannel.Request) {
+    val createTopicsRequest = request.body.asInstanceOf[CreateTopicsRequest]
+
+    def sendResponseCallback(results: Map[String, Errors]): Unit = {
+      val respHeader = new ResponseHeader(request.header.correlationId)
+      val responseBody = new CreateTopicsResponse(results.asJava)
+      trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+    }
+
+    if (!controller.isActive()) {
+      val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
+        (topic, Errors.NOT_CONTROLLER)
+      }
+      sendResponseCallback(results)
+    } else if (!authorize(request.session, Create, Resource.ClusterResource)) {
+      val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
+        (topic, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      }
+      sendResponseCallback(results)
+    }
+    else {
+      val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) =>
+        !createTopicsRequest.duplicateTopics.contains(topic)
+      }
+
+      // Special handling to add duplicate topics to the response
+      def sendResponseWithDuplicatesCallback(results: Map[String, Errors]): Unit = {
+        if (duplicateTopics.nonEmpty)
+          warn(s"Create topics request from client ${request.header.clientId} contains multiple entries for the following topics: ${duplicateTopics.keySet.mkString(",")}")
+        val completeResults = results ++ duplicateTopics.keySet.map((_, Errors.INVALID_REQUEST)).toMap
+        sendResponseCallback(completeResults)
+      }
+
+      adminManager.createTopics(
+        createTopicsRequest.timeout.toInt,
+        validTopics,
+        sendResponseWithDuplicatesCallback
+      )
+    }
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 994e28e..78c6606 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -119,6 +119,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   var logManager: LogManager = null
 
   var replicaManager: ReplicaManager = null
+  var adminManager: AdminManager = null
 
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
   var dynamicConfigManager: DynamicConfigManager = null
@@ -199,6 +200,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
         kafkaController.startup()
 
+        adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
+
         /* start group coordinator */
         groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
         groupCoordinator.startup()
@@ -211,7 +214,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         }
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
           kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 10e0bae..b1f0283 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -20,7 +20,6 @@ import java.util.{ArrayList, Collections, Properties}
 
 import kafka.cluster.EndPoint
 import kafka.common.TopicAndPartition
-import kafka.coordinator.GroupCoordinator
 import kafka.integration.KafkaServerTestHarness
 import kafka.security.auth._
 import kafka.server.KafkaConfig
@@ -30,6 +29,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests._
+import CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{Node, TopicPartition, requests}
 import org.junit.Assert._
@@ -42,6 +42,7 @@ import org.apache.kafka.common.internals.TopicConstants
 
 class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   val topic = "topic"
+  val createTopic = "topic-new"
   val part = 0
   val brokerId: Integer = 0
   val correlationId = 0
@@ -54,6 +55,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
 
   val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
+  val ClusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
   val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
   val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
@@ -89,8 +91,9 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       ApiKeys.LEAVE_GROUP.id -> classOf[LeaveGroupResponse],
       ApiKeys.LEADER_AND_ISR.id -> classOf[requests.LeaderAndIsrResponse],
       ApiKeys.STOP_REPLICA.id -> classOf[requests.StopReplicaResponse],
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> classOf[requests.ControlledShutdownResponse]
-    )
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> classOf[requests.ControlledShutdownResponse],
+      ApiKeys.CREATE_TOPICS.id -> classOf[CreateTopicsResponse]
+  )
 
   val RequestKeyToErrorCode = Map[Short, (Nothing) => Short](
     ApiKeys.METADATA.id -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
@@ -107,7 +110,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     ApiKeys.LEAVE_GROUP.id -> ((resp: LeaveGroupResponse) => resp.errorCode()),
     ApiKeys.LEADER_AND_ISR.id -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
     ApiKeys.STOP_REPLICA.id -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode())
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()),
+    ApiKeys.CREATE_TOPICS.id -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.code)
   )
 
   val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]](
@@ -125,7 +129,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     ApiKeys.LEAVE_GROUP.id -> GroupReadAcl,
     ApiKeys.LEADER_AND_ISR.id -> ClusterAcl,
     ApiKeys.STOP_REPLICA.id -> ClusterAcl,
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ClusterAcl
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ClusterAcl,
+    ApiKeys.CREATE_TOPICS.id -> ClusterCreateAcl
   )
 
   // configure the servers and clients
@@ -227,6 +232,10 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     new requests.ControlledShutdownRequest(brokerId)
   }
 
+  private def createTopicsRequest = {
+    new CreateTopicsRequest(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0)
+  }
+
   @Test
   def testAuthorization() {
     val requestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
@@ -244,7 +253,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       ApiKeys.LEAVE_GROUP.id -> createLeaveGroupRequest,
       ApiKeys.LEADER_AND_ISR.id -> createLeaderAndIsrRequest,
       ApiKeys.STOP_REPLICA.id -> createStopReplicaRequest,
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> createControlledShutdownRequest
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> createControlledShutdownRequest,
+      ApiKeys.CREATE_TOPICS.id -> createTopicsRequest
     )
 
     val socket = new Socket("localhost", servers.head.boundPort())

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7df1411..238cbad 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.admin
 
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
@@ -27,7 +27,7 @@ import kafka.utils._
 import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{Logging, TestUtils, ZkUtils}
-import kafka.common.{TopicAndPartition, TopicExistsException}
+import kafka.common.TopicAndPartition
 import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
 import java.io.File
 
@@ -42,12 +42,12 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
 
     // test 0 replication factor
-    intercept[AdminOperationException] {
+    intercept[InvalidReplicationFactorException] {
       AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0)
     }
 
     // test wrong replication factor
-    intercept[AdminOperationException] {
+    intercept[InvalidReplicationFactorException] {
       AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 6)
     }
 
@@ -74,12 +74,12 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     TestUtils.createBrokersInZk(zkUtils, brokers)
 
     // duplicate brokers
-    intercept[IllegalArgumentException] {
+    intercept[InvalidReplicaAssignmentException] {
       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,0)))
     }
 
     // inconsistent replication factor
-    intercept[IllegalArgumentException] {
+    intercept[InvalidReplicaAssignmentException] {
       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,1), 1->Seq(0)))
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index e0107da..11dc36e 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -16,7 +16,6 @@
  */
 package kafka.admin
 
-import kafka.common.TopicExistsException
 import org.junit.Assert._
 import org.junit.Test
 import kafka.utils.Logging
@@ -25,8 +24,9 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.server.ConfigType
 import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils._
-import kafka.coordinator.GroupCoordinator
 import org.apache.kafka.common.internals.TopicConstants
+import org.apache.kafka.common.errors.TopicExistsException
+
 
 class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc47b9fa/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index f2dd60f..fdaf2f5 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -55,7 +55,7 @@ class ApiVersionsRequestTest extends BaseRequestTest {
   }
 
   private def sendApiVersionsRequest(request: ApiVersionsRequest, version: Short): ApiVersionsResponse = {
-    val response = send(request, ApiKeys.API_VERSIONS, version)
+    val response = send(request, ApiKeys.API_VERSIONS, Some(version))
     ApiVersionsResponse.parse(response)
   }
-}
\ No newline at end of file
+}


Mime
View raw message