kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/2] kafka git commit: KAFKA-5856; Add AdminClient.createPartitions() (KIP-195)
Date Thu, 21 Sep 2017 04:09:24 GMT
KAFKA-5856; Add AdminClient.createPartitions() (KIP-195)

The contribution is my original work and I license the work to the project under the project's open source license.

This patch adds AdminClient.createPartitions() and the network protocol is
uses. The broker-side algorithm is as follows:

1. KafkaApis makes some initial checks on the request, then delegates to the
   new AdminManager.createPartitions() method.
2. AdminManager.createPartitions() performs some validation then delegates to
   AdminUtils.addPartitions().

Aside: I felt it was safer to add the extra validation in
AdminManager.createPartitions() than in AdminUtils.addPartitions() since the
latter is used on other code paths which might fail differently with the
introduction of extra checks.

3. AdminUtils.addPartitions() does its own checks and adds the partitions.
4. AdminManager then uses the existing topic purgatory to wait for the
   PartitionInfo available from the metadata cache to become consistent with
   the new total number of partitions.

The messages of exceptions thrown in AdminUtils affecting this new API have
been made consistent with initial capital letter and terminating period.
A few have been reworded for clarity.

Author: Tom Bentley <tbentley@redhat.com>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3870 from tombentley/KAFKA-5856-AdminClient.createPartitions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f6393f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f6393f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f6393f9

Branch: refs/heads/trunk
Commit: 5f6393f9b17cce17ded7a00e439599dfa77deb2d
Parents: 9469228
Author: Tom Bentley <tbentley@redhat.com>
Authored: Thu Sep 21 05:06:15 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Sep 21 05:06:36 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/AdminClient.java |  45 +++-
 .../clients/admin/CreatePartitionsOptions.java  |  51 +++++
 .../clients/admin/CreatePartitionsResult.java   |  53 +++++
 .../kafka/clients/admin/KafkaAdminClient.java   |  42 +++-
 .../kafka/clients/admin/NewPartitions.java      | 100 +++++++++
 .../errors/ReassignmentInProgressException.java |  32 +++
 .../apache/kafka/common/protocol/ApiKeys.java   |   7 +-
 .../apache/kafka/common/protocol/Errors.java    |   8 +
 .../kafka/common/requests/AbstractRequest.java  |   2 +
 .../kafka/common/requests/AbstractResponse.java |   2 +
 .../requests/CreatePartitionsRequest.java       | 213 +++++++++++++++++++
 .../requests/CreatePartitionsResponse.java      | 105 +++++++++
 .../clients/admin/KafkaAdminClientTest.java     |  36 ++++
 .../common/requests/RequestResponseTest.java    |  30 +++
 .../src/main/scala/kafka/admin/AdminUtils.scala | 130 ++++++-----
 .../main/scala/kafka/admin/TopicCommand.scala   |  17 +-
 .../main/scala/kafka/server/AdminManager.scala  |  82 ++++++-
 .../kafka/server/DelayedCreatePartitions.scala  |  95 +++++++++
 .../src/main/scala/kafka/server/KafkaApis.scala |  40 +++-
 .../kafka/api/AdminClientIntegrationTest.scala  | 187 ++++++++++++++++
 .../kafka/api/BaseProducerSendTest.scala        |   5 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  40 ++--
 .../unit/kafka/admin/DeleteTopicTest.scala      |   7 +-
 .../CreateTopicsRequestWithPolicyTest.scala     |   4 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |   7 +
 25 files changed, 1258 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 3f4a07c..02c2d0a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -87,7 +87,7 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Create a batch of new topics with the default options.
      *
-     * This operation is supported by brokers with version 0.10.1.0 or higher.
+     * This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options. See the overload for more details.
      *
      * @param newTopics         The new topics to create.
      * @return                  The CreateTopicsResult.
@@ -99,9 +99,11 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Create a batch of new topics.
      *
-     * It may take several seconds after AdminClient#createTopics returns
+     * This operation is not transactional so it may succeed for some topics while fail for others.
+     *
+     * It may take several seconds after this method returns
      * success for all the brokers to become aware that the topics have been created.
-     * During this time, AdminClient#listTopics and AdminClient#describeTopics
+     * During this time, {@link AdminClient#listTopics()} and {@link AdminClient#describeTopics(Collection)}
      * may not return information about the new topics.
      *
      * This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported
@@ -417,4 +419,41 @@ public abstract class AdminClient implements AutoCloseable {
      * @return              The DescribeReplicaLogDirResult
      */
     public abstract DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options);
+
+    /**
+     * Increase the number of partitions of the topics given as the keys of {@code newPartitions}
+     * according to the corresponding values.
+     *
+     * This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
+     *                      for the created partitions.
+     * @return              The CreatePartitionsResult.
+     */
+    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
+        return createPartitions(newPartitions, new CreatePartitionsOptions());
+    }
+
+    /**
+     * Increase the number of partitions of the topics given as the keys of {@code newPartitions}
+     * according to the corresponding values.
+     *
+     * This operation is not transactional so it may succeed for some topics while fail for others.
+     *
+     * It may take several seconds after this method returns
+     * success for all the brokers to become aware that the partitions have been created.
+     * During this time, {@link AdminClient#describeTopics(Collection)}
+     * may not return information about the new partitions.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
+     *                      for the created partitions.
+     * @param options       The options to use when creating the new paritions.
+     * @return              The CreatePartitionsResult.
+     */
+    public abstract CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
+                                                            CreatePartitionsOptions options);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
new file mode 100644
index 0000000..aafc207
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * Options for {@link AdminClient#createPartitions(Map)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> {
+
+    private boolean validateOnly = false;
+
+    public CreatePartitionsOptions() {
+    }
+
+    /**
+     * Return true if the request should be validated without creating new partitions.
+     */
+    public boolean validateOnly() {
+        return validateOnly;
+    }
+
+    /**
+     * Set to true if the request should be validated without creating new partitions.
+     */
+    public CreatePartitionsOptions validateOnly(boolean validateOnly) {
+        this.validateOnly = validateOnly;
+        return this;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
new file mode 100644
index 0000000..b27abd6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the {@link AdminClient#createPartitions(Map)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class CreatePartitionsResult {
+
+    private final Map<String, KafkaFuture<Void>> values;
+
+    CreatePartitionsResult(Map<String, KafkaFuture<Void>> values) {
+        this.values = values;
+    }
+
+    /**
+     * Return a map from topic names to futures, which can be used to check the status of individual
+     * partition creations.
+     */
+    public Map<String, KafkaFuture<Void>> values() {
+        return values;
+    }
+
+    /**
+     * Return a future which succeeds if all the partition creations succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(values.values().toArray(new KafkaFuture[0]));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 20db6b7..d857ae3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -64,6 +64,9 @@ import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
 import org.apache.kafka.common.requests.AlterReplicaDirRequest;
 import org.apache.kafka.common.requests.AlterReplicaDirResponse;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
+import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -78,7 +81,6 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest;
 import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsRequest;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
-import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
@@ -1799,4 +1801,42 @@ public class KafkaAdminClient extends AdminClient {
 
         return new DescribeReplicaLogDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
     }
+
+    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options) {
+        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
+        for (String topic : newPartitions.keySet()) {
+            futures.put(topic, new KafkaFutureImpl<Void>());
+        }
+        final Map<String, NewPartitions> requestMap = new HashMap<>(newPartitions);
+
+        final long now = time.milliseconds();
+        runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+
+            @Override
+            public AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new CreatePartitionsRequest.Builder(requestMap, timeoutMs, options.validateOnly());
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                CreatePartitionsResponse response = (CreatePartitionsResponse) abstractResponse;
+                for (Map.Entry<String, ApiError> result : response.errors().entrySet()) {
+                    KafkaFutureImpl<Void> future = futures.get(result.getKey());
+                    if (result.getValue().isSuccess()) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(result.getValue().exception());
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        }, now);
+        return new CreatePartitionsResult((Map) futures);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
new file mode 100644
index 0000000..5aaaeac
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Describes new partitions for a particular topic in a call to {@link AdminClient#createPartitions(Map)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class NewPartitions {
+
+    private int totalCount;
+
+    private List<List<Integer>> newAssignments;
+
+    private NewPartitions(int totalCount, List<List<Integer>> newAssignments) {
+        this.totalCount = totalCount;
+        this.newAssignments = newAssignments;
+    }
+
+    /**
+     * Increase the partition count for a topic to the given {@code totalCount}.
+     * The assignment of new replicas to brokers will be decided by the broker.
+     *
+     * @param totalCount The total partitions count after the operation succeeds.
+     */
+    public static NewPartitions increaseTo(int totalCount) {
+        return new NewPartitions(totalCount, null);
+    }
+
+    /**
+     * <p>Increase the partition count for a topic to the given {@code totalCount}
+     * assigning the new partitions according to the given {@code newAssignments}.
+     * The length of the given {@code newAssignments} should equal {@code totalCount - oldCount}, since
+     * the assignment of existing partitions are not changed.
+     * Each inner list of {@code newAssignments} should have a length equal to
+     * the topic's replication factor.
+     * The first broker id in each inner list is the "preferred replica".</p>
+     *
+     * <p>For example, suppose a topic currently has a replication factor of 2, and
+     * has 3 partitions. The number of partitions can be increased to 6 using a
+     * {@code NewPartition} constructed like this:</p>
+     *
+     * <pre><code>
+     * NewPartitions.increaseTo(6, asList(asList(1, 2),
+     *                                    asList(2, 3),
+     *                                    asList(3, 1)))
+     * </code></pre>
+     * <p>In this example partition 3's preferred leader will be broker 1, partition 4's preferred leader will be
+     * broker 2 and partition 5's preferred leader will be broker 3.</p>
+     *
+     * @param totalCount The total partitions count after the operation succeeds.
+     * @param newAssignments The replica assignments for the new partitions.
+     */
+    public static NewPartitions increaseTo(int totalCount, List<List<Integer>> newAssignments) {
+        return new NewPartitions(totalCount, newAssignments);
+    }
+
+    /**
+     * The new total partition count (not the number of new partitions).
+     */
+    public int totalCount() {
+        return totalCount;
+    }
+
+    /**
+     * The replica assignments for the new partitions, or null if the assignment of
+     * replicas to brokers will be done by the controller.
+     */
+    public List<List<Integer>> assignments() {
+        return newAssignments;
+    }
+
+    @Override
+    public String toString() {
+        return "(totalCount=" + totalCount() + ", newAssignments=" + assignments() + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/common/errors/ReassignmentInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ReassignmentInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/ReassignmentInProgressException.java
new file mode 100644
index 0000000..abd624b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ReassignmentInProgressException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown if a request cannot be completed because a partition reassignment is in progress.
+ */
+public class ReassignmentInProgressException extends ApiException {
+
+    public ReassignmentInProgressException(String msg) {
+        super(msg);
+    }
+
+    public ReassignmentInProgressException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 62dce79..d094134 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.requests.ControlledShutdownRequest;
 import org.apache.kafka.common.requests.ControlledShutdownResponse;
 import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
@@ -167,8 +169,9 @@ public enum ApiKeys {
     DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(),
             DescribeLogDirsResponse.schemaVersions()),
     SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequest.schemaVersions(),
-            SaslAuthenticateResponse.schemaVersions());
-
+            SaslAuthenticateResponse.schemaVersions()),
+    CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
+            CreatePartitionsResponse.schemaVersions());
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d9a95d4..1584238 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -60,6 +60,7 @@ import org.apache.kafka.common.errors.OperationNotAttemptedException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.ReassignmentInProgressException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -536,6 +537,13 @@ public enum Errors {
             public ApiException build(String message) {
                 return new UnknownProducerIdException(message);
             }
+    }),
+    REASSIGNMENT_IN_PROGRESS(59, "A partition reassignment is in progress",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ReassignmentInProgressException(message);
+            }
     });
 
     private interface ApiExceptionBuilder {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 1f1418f..e093f77 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -183,6 +183,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new DescribeLogDirsRequest(struct, apiVersion);
             case SASL_AUTHENTICATE:
                 return new SaslAuthenticateRequest(struct, apiVersion);
+            case CREATE_PARTITIONS:
+                return new CreatePartitionsRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index b6cb8fb..4cff798 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -115,6 +115,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DescribeLogDirsResponse(struct);
             case SASL_AUTHENTICATE:
                 return new SaslAuthenticateResponse(struct);
+            case CREATE_PARTITIONS:
+                return new CreatePartitionsResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
new file mode 100644
index 0000000..81d26a0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+
+public class CreatePartitionsRequest extends AbstractRequest {
+
+    private static final String TOPIC_PARTITION_COUNT_KEY_NAME = "topic_partitions";
+    private static final String NEW_PARTITIONS_KEY_NAME = "new_partitions";
+    private static final String COUNT_KEY_NAME = "count";
+    private static final String ASSIGNMENT_KEY_NAME = "assignment";
+    private static final String TIMEOUT_KEY_NAME = "timeout";
+    private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
+
+    private static final Schema CREATE_PARTITIONS_REQUEST_V0 = new Schema(
+            new Field(TOPIC_PARTITION_COUNT_KEY_NAME, new ArrayOf(
+                    new Schema(
+                            TOPIC_NAME,
+                            new Field(NEW_PARTITIONS_KEY_NAME, new Schema(
+                                    new Field(COUNT_KEY_NAME, INT32, "The new partition count."),
+                                    new Field(ASSIGNMENT_KEY_NAME, ArrayOf.nullable(new ArrayOf(INT32)), "The assigned brokers.")
+                            )))),
+                    "List of topic and the corresponding new partitions."),
+            new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for the partitions to be created."),
+            new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN,
+                    "If true then validate the request, but don't actually increase the partition count."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{CREATE_PARTITIONS_REQUEST_V0};
+    }
+
+    // It is an error for duplicate topics to be present in the request,
+    // so track duplicates here to allow detailed KafkaApis to report per-topic errors.
+    private final Set<String> duplicates;
+    private final Map<String, NewPartitions> newPartitions;
+    private final int timeout;
+    private final boolean validateOnly;
+
+    public static class Builder extends AbstractRequest.Builder<CreatePartitionsRequest> {
+
+        private final Map<String, NewPartitions> newPartitions;
+        private final int timeout;
+        private final boolean validateOnly;
+
+        public Builder(Map<String, NewPartitions> newPartitions, int timeout, boolean validateOnly) {
+            super(ApiKeys.CREATE_PARTITIONS);
+            this.newPartitions = newPartitions;
+            this.timeout = timeout;
+            this.validateOnly = validateOnly;
+        }
+
+        @Override
+        public CreatePartitionsRequest build(short version) {
+            return new CreatePartitionsRequest(newPartitions, timeout, validateOnly, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=CreatePartitionsRequest").
+                    append(", newPartitions=").append(newPartitions).
+                    append(", timeout=").append(timeout).
+                    append(", validateOnly=").append(validateOnly).
+                    append(")");
+            return bld.toString();
+        }
+    }
+
+    CreatePartitionsRequest(Map<String, NewPartitions> newPartitions, int timeout, boolean validateOnly, short apiVersion) {
+        super(apiVersion);
+        this.newPartitions = newPartitions;
+        this.duplicates = Collections.emptySet();
+        this.timeout = timeout;
+        this.validateOnly = validateOnly;
+    }
+
+    public CreatePartitionsRequest(Struct struct, short apiVersion) {
+        super(apiVersion);
+        Object[] topicCountArray = struct.getArray(TOPIC_PARTITION_COUNT_KEY_NAME);
+        Map<String, NewPartitions> counts = new HashMap<>(topicCountArray.length);
+        Set<String> dupes = new HashSet<>();
+        for (Object topicPartitionCountObj : topicCountArray) {
+            Struct topicPartitionCountStruct = (Struct) topicPartitionCountObj;
+            String topic = topicPartitionCountStruct.get(TOPIC_NAME);
+            Struct partitionCountStruct = topicPartitionCountStruct.getStruct(NEW_PARTITIONS_KEY_NAME);
+            int count = partitionCountStruct.getInt(COUNT_KEY_NAME);
+            Object[] assignmentsArray = partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME);
+            NewPartitions newPartition;
+            if (assignmentsArray != null) {
+                List<List<Integer>> assignments = new ArrayList(assignmentsArray.length);
+                for (Object replicas : assignmentsArray) {
+                    Object[] replicasArray = (Object[]) replicas;
+                    List<Integer> replicasList = new ArrayList<>(replicasArray.length);
+                    assignments.add(replicasList);
+                    for (Object broker : replicasArray) {
+                        replicasList.add((Integer) broker);
+                    }
+                }
+                newPartition = NewPartitions.increaseTo(count, assignments);
+            } else {
+                newPartition = NewPartitions.increaseTo(count);
+            }
+            NewPartitions dupe = counts.put(topic, newPartition);
+            if (dupe != null) {
+                dupes.add(topic);
+            }
+        }
+        this.newPartitions = counts;
+        this.duplicates = dupes;
+        this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
+        this.validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
+    }
+
+    public Set<String> duplicates() {
+        return duplicates;
+    }
+
+    public Map<String, NewPartitions> newPartitions() {
+        return newPartitions;
+    }
+
+    public int timeout() {
+        return timeout;
+    }
+
+    public boolean validateOnly() {
+        return validateOnly;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.CREATE_PARTITIONS.requestSchema(version()));
+        List<Struct> topicPartitionsList = new ArrayList<>();
+        for (Map.Entry<String, NewPartitions> topicPartitionCount : this.newPartitions.entrySet()) {
+            Struct topicPartitionCountStruct = struct.instance(TOPIC_PARTITION_COUNT_KEY_NAME);
+            topicPartitionCountStruct.set(TOPIC_NAME, topicPartitionCount.getKey());
+            NewPartitions count = topicPartitionCount.getValue();
+            Struct partitionCountStruct = topicPartitionCountStruct.instance(NEW_PARTITIONS_KEY_NAME);
+            partitionCountStruct.set(COUNT_KEY_NAME, count.totalCount());
+            Object[][] assignments = null;
+            if (count.assignments() != null) {
+                assignments = new Object[count.assignments().size()][];
+                int i = 0;
+                for (List<Integer> partitionAssignment : count.assignments()) {
+                    assignments[i] = partitionAssignment.toArray(new Object[0]);
+                    i++;
+                }
+            }
+            partitionCountStruct.set(ASSIGNMENT_KEY_NAME, assignments);
+            topicPartitionCountStruct.set(NEW_PARTITIONS_KEY_NAME, partitionCountStruct);
+            topicPartitionsList.add(topicPartitionCountStruct);
+        }
+        struct.set(TOPIC_PARTITION_COUNT_KEY_NAME, topicPartitionsList.toArray(new Object[0]));
+        struct.set(TIMEOUT_KEY_NAME, this.timeout);
+        struct.set(VALIDATE_ONLY_KEY_NAME, this.validateOnly);
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Map<String, ApiError> topicErrors = new HashMap<>();
+        for (String topic : newPartitions.keySet()) {
+            topicErrors.put(topic, ApiError.fromThrowable(e));
+        }
+
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new CreatePartitionsResponse(throttleTimeMs, topicErrors);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_PARTITIONS.latestVersion()));
+        }
+    }
+
+    public static CreatePartitionsRequest parse(ByteBuffer buffer, short version) {
+        return new CreatePartitionsRequest(ApiKeys.CREATE_PARTITIONS.parseRequest(version, buffer), version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
new file mode 100644
index 0000000..390221f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
+
+public class CreatePartitionsResponse extends AbstractResponse {
+
+    private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
+
+    private static final Schema CREATE_PARTITIONS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(
+                    new Schema(
+                            TOPIC_NAME,
+                            ERROR_CODE,
+                            ERROR_MESSAGE
+                    )), "Per topic results for the create partitions request")
+    );
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{CREATE_PARTITIONS_RESPONSE_V0};
+    }
+
+    private final int throttleTimeMs;
+    private final Map<String, ApiError> errors;
+
+    public CreatePartitionsResponse(int throttleTimeMs, Map<String, ApiError> errors) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.errors = errors;
+    }
+
+    public CreatePartitionsResponse(Struct struct) {
+        super();
+        Object[] topicErrorsArray = struct.getArray(TOPIC_ERRORS_KEY_NAME);
+        Map<String, ApiError> errors = new HashMap<>(topicErrorsArray.length);
+        for (Object topicErrorObj : topicErrorsArray) {
+            Struct topicErrorStruct = (Struct) topicErrorObj;
+            String topic = topicErrorStruct.get(TOPIC_NAME);
+            ApiError error = new ApiError(topicErrorStruct);
+            errors.put(topic, error);
+        }
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
+        this.errors = errors;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.CREATE_PARTITIONS.responseSchema(version));
+        List<Struct> topicErrors = new ArrayList<>(errors.size());
+        for (Map.Entry<String, ApiError> error : errors.entrySet()) {
+            Struct errorStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
+            errorStruct.set(TOPIC_NAME, error.getKey());
+            error.getValue().write(errorStruct);
+            topicErrors.add(errorStruct);
+        }
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(TOPIC_ERRORS_KEY_NAME, topicErrors.toArray(new Object[topicErrors.size()]));
+        return struct;
+    }
+
+    public Map<String, ApiError> errors() {
+        return errors;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public static CreatePartitionsResponse parse(ByteBuffer buffer, short version) {
+        return new CreatePartitionsResponse(ApiKeys.CREATE_PARTITIONS.parseResponse(version, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 4e3d70c..2412d03 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -29,9 +29,11 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
@@ -378,6 +380,40 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testCreatePartitions() throws Exception {
+        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            Map<String, ApiError> m = new HashMap<>();
+            m.put("my_topic", ApiError.NONE);
+            m.put("other_topic", ApiError.fromThrowable(new InvalidTopicException("some detailed reason")));
+
+            // Test a call where one filter has an error.
+            env.kafkaClient().prepareResponse(new CreatePartitionsResponse(0, m));
+
+            Map<String, NewPartitions> counts = new HashMap<>();
+            counts.put("my_topic", NewPartitions.increaseTo(3));
+            counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3))));
+
+            CreatePartitionsResult results = env.adminClient().createPartitions(counts);
+            Map<String, KafkaFuture<Void>> values = results.values();
+            KafkaFuture<Void> myTopicResult = values.get("my_topic");
+            myTopicResult.get();
+            KafkaFuture<Void> otherTopicResult = values.get("other_topic");
+            try {
+                otherTopicResult.get();
+                fail("get() should throw ExecutionException");
+            } catch (ExecutionException e0) {
+                assertTrue(e0.getCause() instanceof InvalidTopicException);
+                InvalidTopicException e = (InvalidTopicException) e0.getCause();
+                assertEquals("some detailed reason", e.getMessage());
+            }
+        }
+    }
+
     private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
         for (T element : elements) {
             assertTrue("Did not find " + element, collection.contains(element));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e2e458d1..0ef0acd 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
@@ -24,6 +25,8 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -241,6 +244,10 @@ public class RequestResponseTest {
         checkRequest(createDescribeConfigsRequestWithConfigEntries());
         checkErrorResponse(createDescribeConfigsRequest(), new UnknownServerException());
         checkResponse(createDescribeConfigsResponse(), 0);
+        checkRequest(createCreatePartitionsRequest());
+        checkRequest(createCreatePartitionsRequestWithAssignments());
+        checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException());
+        checkResponse(createCreatePartitionsErrorResponse(), 0);
     }
 
     @Test
@@ -1104,4 +1111,27 @@ public class RequestResponseTest {
         errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
         return new AlterConfigsResponse(20, errors);
     }
+
+    private CreatePartitionsRequest createCreatePartitionsRequest() {
+        Map<String, NewPartitions> assignments = new HashMap<>();
+        assignments.put("my_topic", NewPartitions.increaseTo(3));
+        assignments.put("my_other_topic", NewPartitions.increaseTo(3));
+        return new CreatePartitionsRequest(assignments, 0, false, (short) 0);
+    }
+
+    private CreatePartitionsRequest createCreatePartitionsRequestWithAssignments() {
+        Map<String, NewPartitions> assignments = new HashMap<>();
+        assignments.put("my_topic", NewPartitions.increaseTo(3, asList(asList(2))));
+        assignments.put("my_other_topic", NewPartitions.increaseTo(3, asList(asList(2, 3), asList(3, 1))));
+        return new CreatePartitionsRequest(assignments, 0, false, (short) 0);
+    }
+
+    private CreatePartitionsResponse createCreatePartitionsErrorResponse() {
+        Map<String, ApiError> results = new HashMap<>();
+        results.put("my_topic", ApiError.fromThrowable(
+                new InvalidReplicaAssignmentException("The assigned brokers included an unknown broker")));
+        results.put("my_topic", ApiError.NONE);
+        return new CreatePartitionsResponse(42, results);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index d077296..2e0fddf 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -24,8 +24,8 @@ import kafka.utils.ZkUtils._
 import java.util.Random
 import java.util.Properties
 
-import kafka.common.TopicAlreadyMarkedForDeletionException
-import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException}
+import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import org.apache.kafka.common.errors.{BrokerNotAvailableException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException}
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -134,17 +134,17 @@ object AdminUtils extends Logging with AdminUtilities {
                               fixedStartIndex: Int = -1,
                               startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
     if (nPartitions <= 0)
-      throw new InvalidPartitionsException("number of partitions must be larger than 0")
+      throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
     if (replicationFactor <= 0)
-      throw new InvalidReplicationFactorException("replication factor must be larger than 0")
+      throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
     if (replicationFactor > brokerMetadatas.size)
-      throw new InvalidReplicationFactorException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
+      throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
     if (brokerMetadatas.forall(_.rack.isEmpty))
       assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
         startPartitionId)
     else {
       if (brokerMetadatas.exists(_.rack.isEmpty))
-        throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
+        throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
       assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
         startPartitionId)
     }
@@ -264,74 +264,106 @@ object AdminUtils extends Logging with AdminUtilities {
   * @param zkUtils Zookeeper utilities
   * @param topic Topic for adding partitions to
   * @param numPartitions Number of partitions to be set
-  * @param replicaAssignmentStr Manual replica assignment
+  * @param replicaAssignment Manual replica assignment, or none
   * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
   */
   def addPartitions(zkUtils: ZkUtils,
                     topic: String,
+                    existingAssignment: Map[Int, Seq[Int]],
                     numPartitions: Int = 1,
-                    replicaAssignmentStr: String = "",
+                    replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
                     checkBrokerAvailable: Boolean = true,
-                    rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
-    val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
-    if (existingPartitionsReplicaList.isEmpty)
-      throw new AdminOperationException("The topic %s does not exist".format(topic))
-
-    val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
-      case None => throw new AdminOperationException("the topic does not have partition with id 0, it should never happen")
-      case Some(headPartitionReplica) => headPartitionReplica._2
+                    rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
+                    validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
+    if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
+      // We prevent addition partitions while a reassignment is in progress, since
+      // during reassignment there is no meaningful notion of replication factor
+      throw new ReassignmentInProgressException("A partition reassignment is in progress.")
     }
-    val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
+
+    val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
+      throw new AdminOperationException(
+        s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. Assignment: $existingAssignment"))
+
+    val partitionsToAdd = numPartitions - existingAssignment.size
     if (partitionsToAdd <= 0)
-      throw new AdminOperationException("The number of partitions for a topic can only be increased")
+      throw new InvalidPartitionsException(
+        s"The number of partitions for a topic can only be increased. " +
+          s"Topic $topic currently has ${existingAssignment.size} partitions, " +
+          s"$numPartitions would not be an increase.")
 
     // create the new partition replication list
     val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
-    val newPartitionReplicaList =
-      if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
-        val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingReplicaListForPartitionZero.head))
-        AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingReplicaListForPartitionZero.size,
-          startIndex, existingPartitionsReplicaList.size)
-      }
-      else
-        getManualReplicaAssignment(replicaAssignmentStr, brokerMetadatas.map(_.id).toSet,
-          existingPartitionsReplicaList.size, checkBrokerAvailable)
+    val newPartitionReplicaList = replicaAssignment.getOrElse{
+      val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingAssignmentPartition0.head))
+      AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingAssignmentPartition0.size,
+        startIndex, existingAssignment.size)
+    }
+    validateReplicaAssignment(newPartitionReplicaList, existingAssignmentPartition0, brokerMetadatas.map(_.id).toSet, checkBrokerAvailable)
 
     // check if manual assignment has the right replication factor
-    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingReplicaListForPartitionZero.size)
+    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingAssignmentPartition0.size)
     if (unmatchedRepFactorList.nonEmpty)
-      throw new AdminOperationException("The replication factor in manual replication assignment " +
-        " is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)
-
-    info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
-    val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
-    // add the new list
-    partitionReplicaList ++= newPartitionReplicaList
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList, update = true)
+      throw new InvalidReplicaAssignmentException(s"Existing topic replication factor of ${existingAssignmentPartition0.size}, but manual replication assignment would imply replication factor(s) of ${unmatchedRepFactorList.map(_.size).mkString(",")}.")
+    if (!validateOnly) {
+      info(s"Add partition list for $topic is $newPartitionReplicaList.")
+      val partitionReplicaList = existingAssignment.map{ case (partitiondId, replicas) => (partitiondId, replicas) }
+      // add the combined new list
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList ++ newPartitionReplicaList, update = true)
+    }
+    newPartitionReplicaList
   }
 
-  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
+  /**
+    * Parse a replica assignment string of the form
+    * <pre><code>
+    * broker_id_for_part1_replica1:broker_id_for_part1_replica2,
+    * broker_id_for_part2_replica1:broker_id_for_part2_replica2,
+    * ...
+    * </code></pre>
+    * @param replicaAssignmentList The string to parse
+    * @param startPartitionId
+    * @return
+    */
+  def parseManualReplicaAssignment(replicaAssignmentList: String, startPartitionId: Int): Map[Int, List[Int]] = {
     var partitionList = replicaAssignmentList.split(",")
     val ret = new mutable.HashMap[Int, List[Int]]()
     var partitionId = startPartitionId
-    partitionList = partitionList.takeRight(partitionList.size - partitionId)
+    //partitionList = partitionList.takeRight(partitionList.size - partitionId)
     for (i <- partitionList.indices) {
-      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
-      if (brokerList.isEmpty)
-        throw new AdminOperationException("replication factor must be larger than 0")
-      if (brokerList.size != brokerList.toSet.size)
-        throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
-      if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
-        throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList +
-          "available broker:" + availableBrokerList)
-      ret.put(partitionId, brokerList.toList)
-      if (ret(partitionId).size != ret(startPartitionId).size)
-        throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList)
+      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt).toList
+      ret.put(partitionId, brokerList)
       partitionId = partitionId + 1
     }
     ret.toMap
   }
 
+  def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
+                                existingAssignmentPartition0: Seq[Int],
+                                availableBrokerList: Set[Int], checkBrokerAvailable: Boolean = true): Unit = {
+
+    val badRepFactor = replicaAssignment.toSeq.sortBy(_._1).reverse.map { case (partitionId, brokerList) =>
+      if (brokerList.isEmpty)
+        throw new InvalidReplicaAssignmentException(
+          s"Cannot have replication factor of 0 for partition id $partitionId.")
+      if (brokerList.size != brokerList.toSet.size)
+        throw new InvalidReplicaAssignmentException(
+          s"Duplicate brokers not allowed in replica assignment: " +
+            s"${brokerList.mkString(", ")} for partition id $partitionId.")
+      if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
+        throw new BrokerNotAvailableException(
+          s"Some brokers specified for partition id $partitionId are not available. " +
+            s"Specified brokers: ${brokerList.mkString(", ")}, " +
+            s"available brokers: ${availableBrokerList.mkString(", ")}.")
+      partitionId -> brokerList.size
+
+    }.filter{ case (partitionId, repFactor) => repFactor != existingAssignmentPartition0.size}
+    if (badRepFactor.nonEmpty)
+      throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
+        s"partition 0 has ${existingAssignmentPartition0.size} " +
+        s"while partitions [${badRepFactor.map(_._1).mkString(", ")}] have replication factors [${badRepFactor.map(_._2).mkString(", ")}], respectively")
+  }
+
   def deleteTopic(zkUtils: ZkUtils, topic: String) {
       if (topicExists(zkUtils, topic)) {
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 2d3a76c..f22c5d4 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -28,7 +28,7 @@ import kafka.server.ConfigType
 import kafka.utils.ZkUtils._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
@@ -144,8 +144,21 @@ object TopicCommand extends Logging {
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +
           "logic or ordering of the messages will be affected")
         val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
+        val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+          case (topicPartition, replicas) => topicPartition.partition -> replicas
+        }
+        if (existingAssignment.isEmpty)
+          throw new InvalidTopicException(s"The topic $topic does not exist")
         val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
-        AdminUtils.addPartitions(zkUtils, topic, nPartitions, replicaAssignmentStr)
+        val newAssignment = if (replicaAssignmentStr == null || replicaAssignmentStr.isEmpty)
+          None
+        else {
+          var partitionList = replicaAssignmentStr.split(",")
+          val startPartitionId = existingAssignment.size;
+          partitionList = partitionList.takeRight(partitionList.size - startPartitionId)
+          Some(AdminUtils.parseManualReplicaAssignment(partitionList.mkString(","), startPartitionId))
+        }
+        AdminUtils.addPartitions(zkUtils, topic, existingAssignment, nPartitions, newAssignment)
         println("Adding partitions succeeded!")
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 84972f3..58bb094 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -18,15 +18,17 @@ package kafka.server
 
 import java.util.{Collections, Properties}
 
-import kafka.admin.AdminUtils
-import kafka.common.TopicAlreadyMarkedForDeletionException
+import kafka.admin.{AdminOperationException, AdminUtils}
+import kafka.common.{TopicAlreadyMarkedForDeletionException}
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidTopicException, PolicyViolationException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.CreateTopicsRequest._
 import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType}
@@ -40,6 +42,7 @@ class AdminManager(val config: KafkaConfig,
                    val metrics: Metrics,
                    val metadataCache: MetadataCache,
                    val zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
+
   this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
 
   private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
@@ -194,6 +197,79 @@ class AdminManager(val config: KafkaConfig,
     }
   }
 
+  def createPartitions(timeout: Int,
+                       newPartitions: Map[String, NewPartitions],
+                       validateOnly: Boolean,
+                       listenerName: ListenerName,
+                       callback: Map[String, ApiError] => Unit): Unit = {
+
+    // 1. map over topics creating assignment and calling AdminUtils
+    val metadata = newPartitions.map{ case (topic, newPartition) =>
+      try {
+        val oldNumPartitions = zkUtils.getTopicPartitionCount(topic).getOrElse(throw new InvalidTopicException())
+        val newNumPartitions = newPartition.totalCount
+        val numPartitionsIncrement = newNumPartitions - oldNumPartitions
+        if (numPartitionsIncrement < 0) {
+          throw new InvalidPartitionsException(
+            s"Topic currently has $oldNumPartitions partitions, which is higher than the requested $newNumPartitions.")
+        } else if (numPartitionsIncrement == 0) {
+          throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.")
+        }
+        val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+          case (topicPartition, replicas) => topicPartition.partition -> replicas
+        }
+        if (existingAssignment.isEmpty)
+          throw new InvalidTopicException(s"The topic $topic does not exist")
+        val reassignment = if (newPartition.assignments == null) {
+          None
+        } else {
+          val assignments = newPartition.assignments.asScala.map(inner => inner.asScala.toList)
+          // check each broker exists
+          val unknownBrokers = assignments.flatten.toSet -- zkUtils.getAllBrokersInCluster.map(broker => broker.id)
+          if (unknownBrokers.nonEmpty) {
+            throw new InvalidReplicaAssignmentException(
+              s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.")
+          }
+          if (assignments.size != numPartitionsIncrement) {
+            throw new InvalidRequestException(
+              s"Increasing the number of partitions by $numPartitionsIncrement " +
+                s"but ${assignments.size} assignments provided.")
+          }
+          Some(newPartition.assignments.asScala.toList.zipWithIndex.map{ case (replicas, index) =>
+            existingAssignment.size + index -> replicas.asScala.map(_.toInt)
+          }.toMap)
+        }
+
+        val added = AdminUtils.addPartitions(zkUtils, topic, existingAssignment, newPartition.totalCount, reassignment, validateOnly = validateOnly)
+        CreatePartitionMetadata(topic, added.keySet, ApiError.NONE)
+      } catch {
+        case e: AdminOperationException =>
+          CreatePartitionMetadata(topic, null, ApiError.fromThrowable(e))
+        case e: ApiException =>
+          CreatePartitionMetadata(topic, null, ApiError.fromThrowable(e))
+      }
+    }
+
+    // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
+    if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
+      val results = metadata.map { createPartitionMetadata =>
+        // ignore topics that already have errors
+        if (createPartitionMetadata.error.isSuccess() && !validateOnly) {
+          (createPartitionMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
+        } else {
+          (createPartitionMetadata.topic, createPartitionMetadata.error)
+        }
+      }.toMap
+      callback(results)
+    } else {
+      // 3. else pass the assignments and errors to the delayed operation and set the keys
+      val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, callback)
+      val delayedCreateKeys = newPartitions.keySet.map(new TopicKey(_)).toSeq
+      // try to complete the request immediately, otherwise put it into the purgatory
+      topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
+    }
+  }
+
   def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]]): Map[Resource, DescribeConfigsResponse.Config] = {
     resourceToConfigNames.map { case (resource, configNames) =>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
new file mode 100644
index 0000000..f7bf32a
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.api.LeaderAndIsr
+import org.apache.kafka.clients.admin.NewPartitions
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.ApiError
+
+import scala.collection.{Map, Seq, Set}
+import scala.collection.JavaConverters._
+
+case class CreatePartitionMetadata(topic: String, addedPartitions: Set[Int], error: ApiError)
+
+/**
+  * A delayed create partitions operation that can be created by the admin manager and watched
+  * in the topic purgatory
+  */
+class DelayedCreatePartitions(delayMs: Long,
+                              createMetadata: Seq[CreatePartitionMetadata],
+                              adminManager: AdminManager,
+                              responseCallback: Map[String, ApiError] => Unit)
+  extends DelayedOperation(delayMs) {
+
+  /**
+    * The operation can be completed if all of the topics that do not have an error exist and every partition has a leader in the controller.
+    * See KafkaController.onNewTopicCreation
+    */
+  override def tryComplete() : Boolean = {
+    trace(s"Trying to complete operation for $createMetadata")
+
+    val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess)
+      .foldLeft(0) { case (topicCounter, metadata) =>
+        topicCounter + missingLeaderCount(metadata.topic, metadata.addedPartitions)
+      }
+
+    if (leaderlessPartitionCount == 0) {
+      trace("All partitions have a leader, completing the delayed operation")
+      forceComplete()
+    } else {
+      trace(s"$leaderlessPartitionCount partitions do not have a leader, not completing the delayed operation")
+      false
+    }
+  }
+
+  /**
+    * Check for partitions that are still missing a leader, update their error code and call the responseCallback
+    */
+  override def onComplete() {
+    trace(s"Completing operation for $createMetadata")
+    val results = createMetadata.map { metadata =>
+      // ignore topics that already have errors
+      if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, metadata.addedPartitions) > 0)
+        (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
+      else
+        (metadata.topic, metadata.error)
+    }.toMap
+    responseCallback(results)
+  }
+
+  override def onExpiration(): Unit = { }
+
+  private def missingLeaderCount(topic: String, partitions: Set[Int]): Int = {
+    partitions.foldLeft(0) { case (counter, partition) =>
+      if (isMissingLeader(topic, partition)) {
+        trace(s"topic $topic, partition $partition is missing its leader")
+        counter + 1
+      } else {
+        trace(s"topic $topic, partition $partition has its leader")
+        counter
+      }
+    }
+  }
+
+  private def isMissingLeader(topic: String, partition: Int): Boolean = {
+    val partitionInfo = adminManager.metadataCache.getPartitionInfo(topic, partition)
+    trace(s"PartitionState topic $topic, partition $partition: $partitionInfo")
+    partitionInfo.isEmpty || partitionInfo.get.basePartitionState.leader == LeaderAndIsr.NoLeader
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e07e689..6d82892 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -55,6 +55,7 @@ import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshake
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import DescribeLogDirsResponse.LogDirInfo
+import org.apache.kafka.clients.admin.NewPartitions
 
 import scala.collection.{mutable, _}
 import scala.collection.JavaConverters._
@@ -133,6 +134,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.ALTER_REPLICA_DIR => handleAlterReplicaDirRequest(request)
         case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
+        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -219,7 +221,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       if (adminManager.hasDelayedTopicOperations) {
         updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
-        adminManager.tryCompleteDelayedTopicOperations(topic)
+          adminManager.tryCompleteDelayedTopicOperations(topic)
         }
       }
       sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE))
@@ -1333,6 +1335,42 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = {
+    val alterPartitionCountsRequest = request.body[CreatePartitionsRequest]
+    val (valid, errors) =
+    if (!controller.isActive) {
+      (Map.empty[String, NewPartitions],
+      alterPartitionCountsRequest.newPartitions.asScala.map { case (topic, _) =>
+        (topic, new ApiError(Errors.NOT_CONTROLLER, null))
+      })
+    } else {
+      // Special handling to add duplicate topics to the response
+      val dupes = alterPartitionCountsRequest.duplicates.asScala
+      val notDuped = alterPartitionCountsRequest.newPartitions.asScala.retain((topic, count) => !dupes.contains(topic))
+      val (authorized, unauthorized) = notDuped.partition { case (topic, _) =>
+        authorize(request.session, Alter, new Resource(Topic, topic))
+      }
+      val (queuedForDeletion, live) = authorized.partition{ case (topic, _) => controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic) }
+      (live,
+      dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request")).toMap ++
+        unauthorized.map{ case (topic, np) =>
+          topic -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.TOPIC_AUTHORIZATION_FAILED.message())
+        } ++ queuedForDeletion.map{ case (topic, np) =>
+          topic -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion.")
+        })
+    }
+
+    def sendResponseCallback(results: Map[String, ApiError]): Unit = {
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        val responseBody = new CreatePartitionsResponse(requestThrottleMs, (results ++ errors).asJava)
+        trace(s"Sending alter partition counts response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
+    }
+    adminManager.createPartitions(alterPartitionCountsRequest.timeout(), valid, alterPartitionCountsRequest.validateOnly, request.context.listenerName, sendResponseCallback)
+  }
+
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
     val deleteTopicRequest = request.body[DeleteTopicsRequest]
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 49a75b9..823e44d 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -18,6 +18,7 @@ package kafka.api
 
 import java.util
 import java.util.{Collections, Properties}
+import java.util.Arrays.asList
 import java.util.concurrent.{ExecutionException, TimeUnit}
 import java.io.File
 
@@ -360,6 +361,192 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
+  def testCreatePartitions(): Unit = {
+    client = AdminClient.create(createConfig)
+
+    // Create topics
+    val topic1 = "create-partitions-topic-1"
+    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties)
+
+    val topic2 = "create-partitions-topic-2"
+    TestUtils.createTopic(zkUtils, topic2, 1, 2, servers, new Properties)
+
+    // assert that both the topics have 1 partition
+    assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
+    assertEquals(1, client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions.size)
+
+    val validateOnly = new CreatePartitionsOptions().validateOnly(true)
+
+    // assert that a validateOnly request doesn't increase the number of partitions
+    var alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(2)).asJava, validateOnly)
+    var altered = alterResult.values.get(topic1).get
+    // assert that the topics still has 1 partition
+    assertEquals(1, client.describeTopics(Set(topic1).asJavaCollection).values.get(topic1).get.partitions.size)
+
+    // try creating a new partition (no assignments), to bring the total to 3 partitions
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(3)).asJava)
+    altered = alterResult.values.get(topic1).get
+    // assert that the topics now has 2 partitions
+    var actualPartitions = client.describeTopics(Set(topic1).asJavaCollection).values.get(topic1).get.partitions
+    assertEquals(3, actualPartitions.size)
+
+    // now try creating a new partition (with assignments), to bring the total to 3 partitions
+    alterResult = client.createPartitions(Map(topic2 ->
+      NewPartitions.increaseTo(3, asList(asList(0, 1), asList(1, 2)))).asJava)
+    altered = alterResult.values.get(topic2).get
+    // assert that the topics now has 3 partitions
+    actualPartitions = client.describeTopics(Set(topic2).asJavaCollection).values.get(topic2).get.partitions
+    assertEquals(3, actualPartitions.size)
+    assertEquals(List[Integer](0, 1), actualPartitions.get(1).replicas.asScala.map(_.id).toList)
+    assertEquals(List[Integer](1, 2), actualPartitions.get(2).replicas.asScala.map(_.id).toList)
+
+    // try a newCount which would be a decrease
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(1)).asJava, validateOnly)
+    try {
+      alterResult.values.get(topic1).get
+      fail("Expect InvalidPartitionsException when newCount is a decrease")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
+        assertEquals("Topic currently has 3 partitions, which is higher than the requested 1.", e.getCause.getMessage)
+    }
+
+    // try a newCount which would be a noop
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(3)).asJava, validateOnly)
+    try {
+      alterResult.values.get(topic1).get
+      fail("Expect InvalidPartitionsException when newCount == oldCount")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
+        assertEquals("Topic already has 3 partitions.", e.getCause.getMessage)
+    }
+
+    // try a bad topic name
+    val unknownTopic = "an-unknown-topic"
+    alterResult = client.createPartitions(Map(unknownTopic ->
+      NewPartitions.increaseTo(2)).asJava, validateOnly)
+    try {
+      alterResult.values.get(unknownTopic).get
+      fail("Expect InvalidTopicException when using an unknown topic")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
+        assertEquals("The request attempted to perform an operation on an invalid topic.", e.getCause.getMessage)
+    }
+
+    // try an invalid newCount
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(-22)).asJava, validateOnly)
+    try {
+      altered = alterResult.values.get(topic1).get
+      fail("Expect InvalidPartitionsException when newCount is invalid")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
+        assertEquals("Topic currently has 3 partitions, which is higher than the requested -22.",
+          e.getCause.getMessage)
+    }
+
+    // try assignments where the number of brokers != replication factor
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, validateOnly)
+    try {
+      altered = alterResult.values.get(topic1).get
+      fail("Expect InvalidPartitionsException when #brokers != replication factor")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+        assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " +
+          "while partitions [3] have replication factors [2], respectively",
+          e.getCause.getMessage)
+    }
+
+    // try #assignments incompatible with the increase
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, validateOnly)
+    try {
+      altered = alterResult.values.get(topic1).get
+      fail("Expect InvalidRequestException when #assignments != newCount - oldCount")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidRequestException])
+        assertEquals("Increasing the number of partitions by 1 but 2 assignments provided.", e.getCause.getMessage)
+    }
+
+    // try with duplicate brokers in assignments
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, validateOnly)
+    try {
+      altered = alterResult.values.get(topic1).get
+      fail("Expect InvalidReplicaAssignmentException when assignments has duplicate brokers")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+        assertEquals("Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3.",
+          e.getCause.getMessage)
+    }
+
+    // try assignments with differently sized inner lists
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, validateOnly)
+    try {
+      altered = alterResult.values.get(topic1).get
+      fail("Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists")
+    } catch {
+      case e: ExecutionException =>
+        e.printStackTrace()
+        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+        assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " +
+          "while partitions [4] have replication factors [2], respectively", e.getCause.getMessage)
+    }
+
+    // try assignments with unknown brokers
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(4, asList(asList(12)))).asJava, validateOnly)
+    try {
+      altered = alterResult.values.get(topic1).get
+      fail("Expect InvalidReplicaAssignmentException when assignments contains an unknown broker")
+    } catch {
+      case e: ExecutionException =>
+        e.printStackTrace()
+        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+        assertEquals("Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage)
+    }
+
+    // try with empty assignments
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(4, Collections.emptyList())).asJava, validateOnly)
+    try {
+      altered = alterResult.values.get(topic1).get
+      fail("Expect InvalidRequestException when assignments is empty")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidRequestException])
+        assertEquals("Increasing the number of partitions by 1 but 0 assignments provided.", e.getCause.getMessage)
+    }
+
+    // finally, try to add partitions to a topic queued for deletion
+    val deleteResult = client.deleteTopics(asList(topic1))
+    deleteResult.values.get(topic1).get
+    alterResult = client.createPartitions(Map(topic1 ->
+      NewPartitions.increaseTo(4)).asJava, validateOnly)
+    try {
+      altered = alterResult.values.get(topic1).get
+      fail("Expect InvalidTopicException when the topic is queued for deletion")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
+        assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+    }
+
+  }
+
+  @Test
   def testInvalidAlterConfigs(): Unit = {
     client = AdminClient.create(createConfig)
     checkInvalidAlterConfigs(zkUtils, servers, client)


Mime
View raw message