kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] kafka git commit: KAFKA-5694; Add AlterReplicaDirRequest and DescribeReplicaDirRequest (KIP-113 part-1)
Date Sun, 03 Sep 2017 06:21:04 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
new file mode 100644
index 0000000..f97f9a0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class AlterReplicaDirResponse extends AbstractResponse {
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level key names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * LOG_DIR_NOT_FOUND (57)
+     * KAFKA_STORAGE_ERROR (56)
+     * REPLICA_NOT_AVAILABLE (9)
+     * UNKNOWN (-1)
+     */
+    private final Map<TopicPartition, Errors> responses;
+    private final int throttleTimeMs;
+
+    public AlterReplicaDirResponse(Struct struct) {
+        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        responses = new HashMap<>();
+        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicStruct = (Struct) topicStructObj;
+            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionStructObj;
+                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                responses.put(new TopicPartition(topic, partition), error);
+            }
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     */
+    public AlterReplicaDirResponse(int throttleTimeMs, Map<TopicPartition, Errors> responses) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.responses = responses;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey());
+            List<Struct> partitionStructArray = new ArrayList<>();
+            for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
+                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                Errors response = responsesByPartitionEntry.getValue();
+                partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey());
+                partitionStruct.set(ERROR_CODE_KEY_NAME, response.code());
+                partitionStructArray.add(partitionStruct);
+            }
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Map<TopicPartition, Errors> responses() {
+        return this.responses;
+    }
+
+    public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) {
+        return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
new file mode 100644
index 0000000..338d684
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class DescribeLogDirsRequest extends AbstractRequest {
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private final Set<TopicPartition> topicPartitions;
+
+    public static class Builder extends AbstractRequest.Builder<DescribeLogDirsRequest> {
+        private final Set<TopicPartition> topicPartitions;
+
+        // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions.
+        public Builder(Set<TopicPartition> partitions) {
+            super(ApiKeys.DESCRIBE_LOG_DIRS);
+            this.topicPartitions = partitions;
+        }
+
+        @Override
+        public DescribeLogDirsRequest build(short version) {
+            return new DescribeLogDirsRequest(topicPartitions, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(type=DescribeLogDirsRequest")
+                .append(", topicPartitions=")
+                .append(topicPartitions)
+                .append(")");
+            return builder.toString();
+        }
+    }
+
+    public DescribeLogDirsRequest(Struct struct, short version) {
+        super(version);
+
+        if (struct.getArray(TOPICS_KEY_NAME) == null) {
+            topicPartitions = null;
+        } else {
+            topicPartitions = new HashSet<>();
+            for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+                Struct topicStruct = (Struct) topicStructObj;
+                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+                for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                    int partition = (Integer) partitionObj;
+                    topicPartitions.add(new TopicPartition(topic, partition));
+                }
+            }
+        }
+    }
+
+    // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions.
+    public DescribeLogDirsRequest(Set<TopicPartition> topicPartitions, short version) {
+        super(version);
+        this.topicPartitions = topicPartitions;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.requestSchema(version()));
+        if (topicPartitions == null) {
+            struct.set(TOPICS_KEY_NAME, null);
+            return struct;
+        }
+
+        Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
+        for (TopicPartition tp : topicPartitions) {
+            if (!partitionsByTopic.containsKey(tp.topic())) {
+                partitionsByTopic.put(tp.topic(), new ArrayList<Integer>());
+            }
+            partitionsByTopic.get(tp.topic()).add(tp.partition());
+        }
+
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, List<Integer>> partitionsByTopicEntry : partitionsByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_KEY_NAME, partitionsByTopicEntry.getKey());
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionsByTopicEntry.getValue().toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new DescribeLogDirsResponse(throttleTimeMs, new HashMap<String, LogDirInfo>());
+            default:
+                throw new IllegalArgumentException(
+                    String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
+                        this.getClass().getSimpleName(), ApiKeys.DESCRIBE_LOG_DIRS.latestVersion()));
+        }
+    }
+
+    public boolean isAllTopicPartitions() {
+        return topicPartitions == null;
+    }
+
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    public static DescribeLogDirsRequest parse(ByteBuffer buffer, short version) {
+        return new DescribeLogDirsRequest(ApiKeys.DESCRIBE_LOG_DIRS.parseRequest(version, buffer), version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
new file mode 100644
index 0000000..f6b31ae
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class DescribeLogDirsResponse extends AbstractResponse {
+
+    public static final long INVALID_OFFSET_LAG = -1L;
+
+    // request level key names
+    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
+
+    // dir level key names
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String LOG_DIR_KEY_NAME = "log_dir";
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level key names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String SIZE_KEY_NAME = "size";
+    private static final String OFFSET_LAG_KEY_NAME = "offset_lag";
+    private static final String IS_FUTURE_KEY_NAME = "is_future";
+
+    private final int throttleTimeMs;
+    private final Map<String, LogDirInfo> logDirInfos;
+
+    public DescribeLogDirsResponse(Struct struct) {
+        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        logDirInfos = new HashMap<>();
+
+        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
+            Struct logDirStruct = (Struct) logDirStructObj;
+            Errors error = Errors.forCode(logDirStruct.getShort(ERROR_CODE_KEY_NAME));
+            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
+            Map<TopicPartition, ReplicaInfo> replicaInfos = new HashMap<>();
+
+            for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
+                Struct topicStruct = (Struct) topicStructObj;
+                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+
+                for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                    Struct partitionStruct = (Struct) partitionStructObj;
+                    int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                    long size = partitionStruct.getLong(SIZE_KEY_NAME);
+                    long offsetLag = partitionStruct.getLong(OFFSET_LAG_KEY_NAME);
+                    boolean isFuture = partitionStruct.getBoolean(IS_FUTURE_KEY_NAME);
+                    ReplicaInfo replicaInfo = new ReplicaInfo(size, offsetLag, isFuture);
+                    replicaInfos.put(new TopicPartition(topic, partition), replicaInfo);
+                }
+            }
+
+            logDirInfos.put(logDir, new LogDirInfo(error, replicaInfos));
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     */
+    public DescribeLogDirsResponse(int throttleTimeMs, Map<String, LogDirInfo> logDirInfos) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.logDirInfos = logDirInfos;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        List<Struct> logDirStructArray = new ArrayList<>();
+        for (Map.Entry<String, LogDirInfo> logDirInfosEntry : logDirInfos.entrySet()) {
+            LogDirInfo logDirInfo = logDirInfosEntry.getValue();
+            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
+            logDirStruct.set(ERROR_CODE_KEY_NAME, logDirInfo.error.code());
+            logDirStruct.set(LOG_DIR_KEY_NAME, logDirInfosEntry.getKey());
+
+            Map<String, Map<Integer, ReplicaInfo>> replicaInfosByTopic = CollectionUtils.groupDataByTopic(logDirInfo.replicaInfos);
+            List<Struct> topicStructArray = new ArrayList<>();
+            for (Map.Entry<String, Map<Integer, ReplicaInfo>> replicaInfosByTopicEntry : replicaInfosByTopic.entrySet()) {
+                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
+                topicStruct.set(TOPIC_KEY_NAME, replicaInfosByTopicEntry.getKey());
+                List<Struct> partitionStructArray = new ArrayList<>();
+
+                for (Map.Entry<Integer, ReplicaInfo> replicaInfosByPartitionEntry : replicaInfosByTopicEntry.getValue().entrySet()) {
+                    Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                    ReplicaInfo replicaInfo = replicaInfosByPartitionEntry.getValue();
+                    partitionStruct.set(PARTITION_KEY_NAME, replicaInfosByPartitionEntry.getKey());
+                    partitionStruct.set(SIZE_KEY_NAME, replicaInfo.size);
+                    partitionStruct.set(OFFSET_LAG_KEY_NAME, replicaInfo.offsetLag);
+                    partitionStruct.set(IS_FUTURE_KEY_NAME, replicaInfo.isFuture);
+                    partitionStructArray.add(partitionStruct);
+                }
+                topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+                topicStructArray.add(topicStruct);
+            }
+            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+            logDirStructArray.add(logDirStruct);
+        }
+        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Map<String, LogDirInfo> logDirInfos() {
+        return logDirInfos;
+    }
+
+    public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) {
+        return new DescribeLogDirsResponse(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version).read(buffer));
+    }
+
+    /**
+     * Possible error code:
+     *
+     * KAFKA_STORAGE_ERROR (56)
+     * UNKNOWN (-1)
+     */
+    static public class LogDirInfo {
+        public final Errors error;
+        public final Map<TopicPartition, ReplicaInfo> replicaInfos;
+
+        public LogDirInfo(Errors error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
+            this.error = error;
+            this.replicaInfos = replicaInfos;
+        }
+    }
+
+    static public class ReplicaInfo {
+
+        public final long size;
+        public final long offsetLag;
+        public final boolean isFuture;
+
+        public ReplicaInfo(long size, long offsetLag, boolean isFuture) {
+            this.size = size;
+            this.offsetLag = offsetLag;
+            this.isFuture = isFuture;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(size=")
+                .append(size)
+                .append(", offsetLag=")
+                .append(offsetLag)
+                .append(", isFuture=")
+                .append(isFuture)
+                .append(")");
+            return builder.toString();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/admin/LogDirsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
new file mode 100644
index 0000000..6a167a2
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -0,0 +1,114 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.admin
+
+import java.io.PrintStream
+import java.util.Properties
+
+import org.apache.kafka.clients.admin.{AdminClientConfig, DescribeLogDirsResult, AdminClient => JAdminClient}
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
+
+import scala.collection.JavaConverters._
+import scala.collection.Map
+import kafka.utils.{CommandLineUtils, Json}
+import joptsimple._
+
+/**
+  * A command for querying log directory usage on the specified brokers
+  */
+object LogDirsCommand {
+
+    def main(args: Array[String]): Unit = {
+        describe(args, System.out)
+    }
+
+    def describe(args: Array[String], out: PrintStream): Unit = {
+        val opts = new LogDirsCommandOptions(args)
+        val adminClient = createAdminClient(opts)
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
+            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+        }
+
+        out.println("Querying brokers for log directories information")
+        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
+        val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.mapValues(_.asScala)
+
+        out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
+        out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
+        adminClient.close()
+    }
+
+    private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]], topicSet: Set[String]): String = {
+        Json.encode(Map(
+            "version" -> 1,
+            "brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) =>
+                Map(
+                    "broker" -> broker,
+                    "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
+                        Map(
+                            "logDir" -> logDir,
+                            "error" -> logDirInfo.error.exceptionName(),
+                            "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, replicaInfo) =>
+                                topicSet.isEmpty || topicSet.contains(topicPartition.topic)
+                            }.map { case (topicPartition, replicaInfo) =>
+                                Map(
+                                    "partition" -> topicPartition.toString,
+                                    "size" -> replicaInfo.size,
+                                    "offsetLag" -> replicaInfo.offsetLag,
+                                    "isFuture" -> replicaInfo.isFuture
+                                )
+                            }
+                        )
+                    }
+                )
+            }
+        ))
+    }
+
+    private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = {
+        val props = new Properties()
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
+        JAdminClient.create(props)
+    }
+
+    class LogDirsCommandOptions(args: Array[String]) {
+        val parser = new OptionParser(false)
+        val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
+          .withRequiredArg
+          .describedAs("The server(s) to use for bootstrapping")
+          .ofType(classOf[String])
+        val describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.")
+        val topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " +
+          "All topics will be queried if no topic list is specified")
+          .withRequiredArg
+          .describedAs("Topic list")
+          .defaultsTo("")
+          .ofType(classOf[String])
+        val brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " +
+          "All brokers in the cluster will be queried if no broker list is specified")
+          .withRequiredArg
+          .describedAs("Broker list")
+          .ofType(classOf[String])
+
+        val options = parser.parse(args : _*)
+        CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt)
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index cae14b1..e27e239 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -16,23 +16,31 @@
  */
 package kafka.admin
 
-import joptsimple.OptionParser
-import kafka.server.{ConfigType, DynamicConfig}
-import kafka.utils._
+import java.util.Properties
+import java.util.concurrent.ExecutionException
 
 import scala.collection._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import scala.collection.JavaConverters._
+import kafka.server.{ConfigType, DynamicConfig}
+import kafka.utils._
 import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.log.LogConfig
-import LogConfig._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.TopicPartitionReplica
+import org.apache.kafka.common.errors.{LogDirNotFoundException, ReplicaNotAvailableException}
+import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaDirOptions, AdminClient => JAdminClient}
+import LogConfig._
+import joptsimple.OptionParser
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirResult.ReplicaLogDirInfo
 
 object ReassignPartitionsCommand extends Logging {
 
   case class Throttle(value: Long, postUpdateAction: () => Unit = () => ())
 
   private[admin] val NoThrottle = Throttle(-1)
+  private[admin] val AnyLogDir = "any"
 
   def main(args: Array[String]): Unit = {
 
@@ -42,13 +50,15 @@ object ReassignPartitionsCommand extends Logging {
                           30000,
                           30000,
                           JaasUtils.isZkSecurityEnabled())
+    val adminClientOpt = createAdminClient(opts)
+
     try {
       if(opts.options.has(opts.verifyOpt))
-        verifyAssignment(zkUtils, opts)
+        verifyAssignment(zkUtils, adminClientOpt, opts)
       else if(opts.options.has(opts.generateOpt))
         generateAssignment(zkUtils, opts)
       else if (opts.options.has(opts.executeOpt))
-        executeAssignment(zkUtils, opts)
+        executeAssignment(zkUtils, adminClientOpt, opts)
     } catch {
       case e: Throwable =>
         println("Partitions reassignment failed due to " + e.getMessage)
@@ -56,16 +66,29 @@ object ReassignPartitionsCommand extends Logging {
     } finally zkUtils.close()
   }
 
-  def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
+  private def createAdminClient(opts: ReassignPartitionsCommandOptions): Option[JAdminClient] = {
+    if (opts.options.has(opts.bootstrapServerOpt)) {
+      val props = new Properties()
+      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool")
+      Some(JAdminClient.create(props))
+    } else {
+      None
+    }
+  }
+
+  def verifyAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
-    verifyAssignment(zkUtils, jsonString)
+    verifyAssignment(zkUtils, adminClientOpt, jsonString)
   }
 
-  def verifyAssignment(zkUtils: ZkUtils, jsonString: String): Unit = {
+  def verifyAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], jsonString: String): Unit = {
     println("Status of partition reassignment: ")
-    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
-    val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
+    val (partitionsToBeReassigned, replicaAssignment) = parsePartitionReassignmentData(jsonString)
+    val reassignedPartitionsStatus = checkIfPartitionReassignmentSucceeded(zkUtils, partitionsToBeReassigned.toMap)
+    val replicaReassignmentStatus = checkIfReplicaReassignmentSucceeded(adminClientOpt, replicaAssignment)
+
     reassignedPartitionsStatus.foreach { case (topicPartition, status) =>
       status match {
         case ReassignmentCompleted =>
@@ -76,7 +99,19 @@ object ReassignPartitionsCommand extends Logging {
           println("Reassignment of partition %s is still in progress".format(topicPartition))
       }
     }
-    removeThrottle(zkUtils, partitionsToBeReassigned, reassignedPartitionsStatus)
+
+    replicaReassignmentStatus.foreach { case (replica, status) =>
+      status match {
+        case ReassignmentCompleted =>
+          println("Reassignment of replica %s completed successfully".format(replica))
+        case ReassignmentFailed =>
+          println("Reassignment of replica %s failed".format(replica))
+        case ReassignmentInProgress =>
+          println("Reassignment of replica %s is still in progress".format(replica))
+      }
+    }
+
+    removeThrottle(zkUtils, partitionsToBeReassigned.toMap, reassignedPartitionsStatus)
   }
 
   private[admin] def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus], admin: AdminUtilities = AdminUtils): Unit = {
@@ -121,8 +156,8 @@ object ReassignPartitionsCommand extends Logging {
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
     val disableRackAware = opts.options.has(opts.disableRackAware)
     val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
-    println("Current partition replica assignment\n%s\n".format(ZkUtils.formatAsReassignmentJson(currentAssignments)))
-    println("Proposed partition reassignment configuration\n%s".format(ZkUtils.formatAsReassignmentJson(proposedAssignments)))
+    println("Current partition replica assignment\n%s\n".format(formatAsReassignmentJson(currentAssignments, Map.empty)))
+    println("Proposed partition reassignment configuration\n%s".format(formatAsReassignmentJson(proposedAssignments, Map.empty)))
   }
 
   def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
@@ -147,42 +182,85 @@ object ReassignPartitionsCommand extends Logging {
     (partitionsToBeReassigned, currentAssignment)
   }
 
-  def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
+  def executeAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
-    val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt) else -1
-    executeAssignment(zkUtils, reassignmentJsonString, Throttle(throttle))
+    val throttle = opts.options.valueOf(opts.throttleOpt)
+    val timeoutMs = opts.options.valueOf(opts.timeoutOpt)
+    executeAssignment(zkUtils, adminClientOpt, reassignmentJsonString, Throttle(throttle), timeoutMs)
   }
 
-  def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Throttle) {
-    val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
+  def executeAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
+    val (partitionAssignment, replicaAssignment) = parseAndValidate(zkUtils, reassignmentJsonString)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, adminClientOpt, partitionAssignment.toMap, replicaAssignment)
 
     // If there is an existing rebalance running, attempt to change its throttle
     if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
       println("There is an existing assignment running.")
       reassignPartitionsCommand.maybeLimit(throttle)
-    }
-    else {
-      printCurrentAssignment(zkUtils, partitionsToBeReassigned)
+    } else {
+      printCurrentAssignment(zkUtils, partitionAssignment.map(_._1.topic))
       if (throttle.value >= 0)
         println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
-      if (reassignPartitionsCommand.reassignPartitions(throttle)) {
+      if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {
         println("Successfully started reassignment of partitions.")
       } else
-        println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+        println("Failed to reassign partitions %s".format(partitionAssignment))
     }
   }
 
-  def printCurrentAssignment(zkUtils: ZkUtils, partitionsToBeReassigned: Seq[(TopicAndPartition, Seq[Int])]): Unit = {
+  def printCurrentAssignment(zkUtils: ZkUtils, topics: Seq[String]): Unit = {
     // before starting assignment, output the current replica assignment to facilitate rollback
-    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
+    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(topics)
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
-      .format(ZkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
+      .format(formatAsReassignmentJson(currentPartitionReplicaAssignment, Map.empty)))
+  }
+
+  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
+                               replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = {
+    Json.encode(Map(
+      "version" -> 1,
+      "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
+        Map(
+          "topic" -> topic,
+          "partition" -> partition,
+          "replicas" -> replicas,
+          "log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic, partition, r), AnyLogDir))
+        )
+      }
+    ))
   }
 
-  def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): Seq[(TopicAndPartition, Seq[Int])] = {
-    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
+  // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
+  def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicAndPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
+    val partitionAssignment = mutable.ListBuffer.empty[(TopicAndPartition, Seq[Int])]
+    val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, String]
+    for {
+      js <- Json.parseFull(jsonData).toSeq
+      partitionsSeq <- js.asJsonObject.get("partitions").toSeq
+      p <- partitionsSeq.asJsonArray.iterator
+    } {
+      val partitionFields = p.asJsonObject
+      val topic = partitionFields("topic").to[String]
+      val partition = partitionFields("partition").to[Int]
+      val newReplicas = partitionFields("replicas").to[Seq[Int]]
+      val newLogDirs = partitionFields.get("log_dirs") match {
+        case Some(jsonValue) => jsonValue.to[Seq[String]]
+        case None => newReplicas.map(r => AnyLogDir)
+      }
+      if (newReplicas.size != newLogDirs.size)
+        throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " +
+          s"size of log dirs list $newLogDirs for partition ${TopicAndPartition(topic, partition)}")
+      partitionAssignment += (TopicAndPartition(topic, partition) -> newReplicas)
+      replicaAssignment ++= newReplicas.zip(newLogDirs).map { case (replica, logDir) =>
+        new TopicPartitionReplica(topic, partition, replica) -> logDir
+      }.filter(_._2 != AnyLogDir)
+    }
+    (partitionAssignment, replicaAssignment)
+  }
+
+  def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): (Seq[(TopicAndPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
+    val (partitionsToBeReassigned, replicaAssignment) = parsePartitionReassignmentData(reassignmentJsonString)
 
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file is empty")
@@ -215,10 +293,18 @@ object ReassignPartitionsCommand extends Logging {
     if (nonExistingBrokerIDs.nonEmpty)
       throw new AdminCommandFailedException("The proposed assignment contains non-existent brokerIDs: " + nonExistingBrokerIDs.mkString(","))
 
-    partitionsToBeReassigned
+    // check that replica will always be moved to another broker if a particular log directory is specified for it.
+    // We will support moving replica within broker after KIP-113 is implemented
+    replicaAssignment.foreach { case (replica, logDir) =>
+      if (existingAssignment.getOrElse(TopicAndPartition(replica.topic(), replica.partition()), Seq.empty).contains(replica.brokerId()))
+        throw new AdminCommandFailedException(s"The proposed assignment intends to move an existing replica $replica to " +
+          s"another log directory $logDir on the same broker. This is not currently supported")
+    }
+
+    (partitionsToBeReassigned, replicaAssignment)
   }
 
-  private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
+  private def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
   :Map[TopicAndPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
     partitionsToBeReassigned.keys.map { topicAndPartition =>
@@ -227,6 +313,47 @@ object ReassignPartitionsCommand extends Logging {
     }.toMap
   }
 
+  private def checkIfReplicaReassignmentSucceeded(adminClientOpt: Option[JAdminClient], replicaAssignment: Map[TopicPartitionReplica, String])
+  :Map[TopicPartitionReplica, ReassignmentStatus] = {
+
+    val replicaLogDirInfos = {
+      if (replicaAssignment.nonEmpty) {
+        val adminClient = adminClientOpt.getOrElse(
+          throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
+        adminClient.describeReplicaLogDir(replicaAssignment.keySet.asJava).all().get().asScala
+      } else {
+        Map.empty[TopicPartitionReplica, ReplicaLogDirInfo]
+      }
+    }
+
+    replicaAssignment.map { case (replica, newLogDir) =>
+      val status: ReassignmentStatus = replicaLogDirInfos.get(replica) match {
+        case Some(replicaLogDirInfo) =>
+          if (replicaLogDirInfo.getCurrentReplicaLogDir == null) {
+            println(s"Partition ${replica.topic()}-${replica.partition()} is not found in any live log dir on " +
+              s"broker ${replica.brokerId()}. There is likely offline log directory on the broker.")
+            ReassignmentFailed
+          } else if (replicaLogDirInfo.getFutureReplicaLogDir == newLogDir) {
+            ReassignmentInProgress
+          } else if (replicaLogDirInfo.getFutureReplicaLogDir != null) {
+            println(s"Partition ${replica.topic()}-${replica.partition()} on broker ${replica.brokerId()} " +
+              s"is being moved to log dir ${replicaLogDirInfo.getFutureReplicaLogDir} instead of $newLogDir")
+            ReassignmentFailed
+          } else if (replicaLogDirInfo.getCurrentReplicaLogDir == newLogDir) {
+            ReassignmentCompleted
+          } else {
+            println(s"Partition ${replica.topic()}-${replica.partition()} on broker ${replica.brokerId()} " +
+              s"is not being moved from log dir ${replicaLogDirInfo.getCurrentReplicaLogDir} to $newLogDir")
+            ReassignmentFailed
+          }
+        case None =>
+          println(s"Partition ${replica.topic()}-${replica.partition()} is not found in any live log dir on broker ${replica.brokerId()}.")
+          ReassignmentFailed
+      }
+      (replica, status)
+    }
+  }
+
   def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
                                             partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
                                             partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
@@ -280,19 +407,26 @@ object ReassignPartitionsCommand extends Logging {
 
   class ReassignPartitionsCommandOptions(args: Array[String]) {
     val parser = new OptionParser(false)
-
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "the server(s) to use for bootstrapping. REQUIRED if " +
+                      "an absolution path of the log directory is specified for any replica in the reassignment json file")
+                      .withRequiredArg
+                      .describedAs("Server(s) to use for bootstrapping")
+                      .ofType(classOf[String])
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
                       "form host:port. Multiple URLS can be given to allow fail-over.")
                       .withRequiredArg
                       .describedAs("urls")
                       .ofType(classOf[String])
     val generateOpt = parser.accepts("generate", "Generate a candidate partition reassignment configuration." +
-      " Note that this only generates a candidate assignment, it does not execute it.")
+                      " Note that this only generates a candidate assignment, it does not execute it.")
     val executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.")
     val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed")
     val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" +
                       "The format to use is - \n" +
-                      "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3] }],\n\"version\":1\n}")
+                      "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3],\n\t  \"log_dirs\": [\"dir1\",\"dir2\",\"dir3\"] }],\n\"version\":1\n}\n" +
+                      "Note that \"log_dirs\" is optional. When it is specified, its length must equal the length of the replicas list. The value in this list " +
+                      "can be either \"any\" or the absolution path of the log directory on the broker. If absolute log directory path is specified, it is currently required that " +
+                      "the replica has not already been created on that broker. The replica will then be created in the specified log directory on the broker later.")
                       .withRequiredArg
                       .describedAs("manual assignment json file path")
                       .ofType(classOf[String])
@@ -313,23 +447,32 @@ object ReassignPartitionsCommand extends Logging {
                       .describedAs("throttle")
                       .defaultsTo("-1")
                       .ofType(classOf[Long])
+    val timeoutOpt = parser.accepts("timeout", "The maximum time in ms allowed to wait for partition reassignment execution to be successfully initiated")
+                      .withRequiredArg()
+                      .describedAs("timeout")
+                      .defaultsTo("10000")
+                      .ofType(classOf[Long])
     val options = parser.parse(args : _*)
   }
 }
 
-class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicAndPartition, Seq[Int]], admin: AdminUtilities = AdminUtils)
+class ReassignPartitionsCommand(zkUtils: ZkUtils,
+                                adminClientOpt: Option[JAdminClient],
+                                proposedPartitionAssignment: Map[TopicAndPartition, Seq[Int]],
+                                proposedReplicaAssignment: Map[TopicPartitionReplica, String] = Map.empty,
+                                admin: AdminUtilities = AdminUtils)
   extends Logging {
 
   import ReassignPartitionsCommand._
 
   def existingAssignment(): Map[TopicAndPartition, Seq[Int]] = {
-    val proposedTopics = proposedAssignment.keySet.map(_.topic).toSeq
+    val proposedTopics = proposedPartitionAssignment.keySet.map(_.topic).toSeq
     zkUtils.getReplicaAssignmentForTopics(proposedTopics)
   }
 
   private def maybeThrottle(throttle: Throttle): Unit = {
     if (throttle.value >= 0) {
-      assignThrottledReplicas(existingAssignment(), proposedAssignment)
+      assignThrottledReplicas(existingAssignment(), proposedPartitionAssignment)
       maybeLimit(throttle)
       throttle.postUpdateAction()
       println(s"The throttle limit was set to ${throttle.value} B/s")
@@ -343,7 +486,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
   def maybeLimit(throttle: Throttle) {
     if (throttle.value >= 0) {
       val existingBrokers = existingAssignment().values.flatten.toSeq
-      val proposedBrokers = proposedAssignment.values.flatten.toSeq
+      val proposedBrokers = proposedPartitionAssignment.values.flatten.toSeq
       val brokers = (existingBrokers ++ proposedBrokers).distinct
 
       for (id <- brokers) {
@@ -401,12 +544,45 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
       allProposed.filter { case (tp, _) => tp.topic == topic })
   }
 
-  def reassignPartitions(throttle: Throttle = NoThrottle): Boolean = {
+  def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {
     maybeThrottle(throttle)
     try {
-      val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
+      val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
       if (validPartitions.isEmpty) false
       else {
+        if (proposedReplicaAssignment.nonEmpty) {
+          // Send AlterReplicaDirRequest to allow broker to create replica in the right log dir later if the replica
+          // has not been created it. This allows us to rebalance load across log directories in the cluster even if
+          // we can not move replicas between log directories on the same broker. We will be able to move replicas
+          // between log directories on the same broker after KIP-113 is implemented.
+          val adminClient = adminClientOpt.getOrElse(
+            throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
+          val alterReplicaDirResult = adminClient.alterReplicaDir(
+            proposedReplicaAssignment.asJava, new AlterReplicaDirOptions().timeoutMs(timeoutMs.toInt))
+          alterReplicaDirResult.values().asScala.foreach { case (replica, future) => {
+              try {
+                /*
+                 * Before KIP-113 is fully implemented, user can only specify the destination log directory of the replica
+                 * if the replica has not already been created on the broker; otherwise the log directory specified in the
+                 * json file will not be enforced. Therefore we want to verify that broker will return ReplicaNotAvailableException
+                 * for this replica.
+                 *
+                 * After KIP-113 is fully implemented, we will not need to verify that the broker returns this ReplicaNotAvailableException
+                 * in this step. And after the reassignment znode is created, we will need to re-send AlterReplicaDirRequest to broker
+                 * if broker returns ReplicaNotAvailableException for any replica in the request.
+                 */
+                future.get()
+                throw new AdminCommandFailedException(s"Partition ${replica.topic()}-${replica.partition()} already exists on broker ${replica.brokerId()}." +
+                  s" Reassign replica to another log directory on the same broker is currently not supported.")
+              } catch {
+                case t: ExecutionException =>
+                  t.getCause match {
+                    case e: ReplicaNotAvailableException => // It is OK if the replica is not available
+                    case e: Throwable => throw e
+                  }
+              }
+          }}
+        }
         val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
         zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
         true
@@ -415,8 +591,14 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
       case _: ZkNodeExistsException =>
         val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
         throw new AdminCommandFailedException("Partition reassignment currently in " +
-        "progress for %s. Aborting operation".format(partitionsBeingReassigned))
-      case e: Throwable => error("Admin command failed", e); false
+          "progress for %s. Aborting operation".format(partitionsBeingReassigned))
+      case e: LogDirNotFoundException =>
+        throw new AdminCommandFailedException(s"The proposed replica assignment $proposedReplicaAssignment contains " +
+          s"invalid log directory. Aborting operation", e)
+      case e: AdminCommandFailedException => throw e
+      case e: Throwable =>
+        error("Admin command failed", e)
+        false
     }
   }
 
@@ -439,8 +621,8 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
   }
 }
 
-
 sealed trait ReassignmentStatus { def status: Int }
 case object ReassignmentCompleted extends ReassignmentStatus { val status = 1 }
 case object ReassignmentInProgress extends ReassignmentStatus { val status = 0 }
 case object ReassignmentFailed extends ReassignmentStatus { val status = -1 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 2377497..690f52a 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -30,7 +30,8 @@ import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.errors.{LogDirNotFoundException, KafkaStorageException}
+
 import scala.collection.JavaConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
@@ -87,6 +88,7 @@ class LogManager(logDirs: Array[File],
     (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
 
   private def offlineLogDirs = logDirs.filterNot(_liveLogDirs.contains)
+  private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()
 
   loadLogs()
 
@@ -525,6 +527,11 @@ class LogManager(logDirs: Array[File],
     }
   }
 
+  def updatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
+    // The logDir should be an absolute path
+    preferredLogDirs.put(topicPartition, logDir)
+  }
+
   /**
    * Get the log if it exists, otherwise return None
    */
@@ -545,9 +552,18 @@ class LogManager(logDirs: Array[File],
         if (!isNew && offlineLogDirs.nonEmpty)
           throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
 
-        val dataDir = nextLogDir()
+        val logDir = {
+          val preferredLogDir = preferredLogDirs.get(topicPartition)
+          if (preferredLogDir != null)
+            preferredLogDir
+          else
+            nextLogDir().getAbsolutePath
+        }
+        if (!isLogDirOnline(logDir))
+          throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline")
+
         try {
-          val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
+          val dir = new File(logDir, topicPartition.topic + "-" + topicPartition.partition)
           Files.createDirectories(dir.toPath)
 
           val log = Log(
@@ -567,13 +583,16 @@ class LogManager(logDirs: Array[File],
           info("Created log for partition [%s,%d] in %s with properties {%s}."
             .format(topicPartition.topic,
               topicPartition.partition,
-              dataDir.getAbsolutePath,
+              logDir,
               config.originals.asScala.mkString(", ")))
+          // Remove the preferred log dir since it has already been satisfied
+          preferredLogDirs.remove(topicPartition)
+
           log
         } catch {
           case e: IOException =>
-            val msg = s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}"
-            logDirFailureChannel.maybeAddOfflineLogDir(dataDir.getAbsolutePath, msg, e)
+            val msg = s"Error while creating log for $topicPartition in dir ${logDir}"
+            logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
             throw new KafkaStorageException(msg, e)
         }
       }
@@ -606,6 +625,7 @@ class LogManager(logDirs: Array[File],
   /**
     * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
     * add it in the queue for deletion.
+    *
     * @param topicPartition TopicPartition that needs to be deleted
     * @return the removed log
     */
@@ -704,8 +724,9 @@ class LogManager(logDirs: Array[File],
 
   // logDir should be an absolute path
   def isLogDirOnline(logDir: String): Boolean = {
+    // The logDir should be an absolute path
     if (!logDirs.exists(_.getAbsolutePath == logDir))
-      throw new RuntimeException(s"Log dir $logDir is not found in the config.")
+      throw new LogDirNotFoundException(s"Log dir $logDir is not found in the config.")
 
     _liveLogDirs.contains(new File(logDir))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1a85222..17c3f2d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -36,7 +36,7 @@ import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.RequestChannel
 import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
 import kafka.security.SecurityUtils
-import kafka.security.auth._
+import kafka.security.auth.{Resource, _}
 import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
@@ -54,6 +54,7 @@ import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
+import DescribeLogDirsResponse.LogDirInfo
 
 import scala.collection.{mutable, _}
 import scala.collection.JavaConverters._
@@ -129,6 +130,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
         case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
         case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
+        case ApiKeys.ALTER_REPLICA_DIR => handleAlterReplicaDirRequest(request)
+        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1910,6 +1913,35 @@ class KafkaApis(val requestChannel: RequestChannel,
       new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ unauthorizedConfigs).asJava))
   }
 
+  def handleAlterReplicaDirRequest(request: RequestChannel.Request): Unit = {
+    val alterReplicaDirRequest = request.body[AlterReplicaDirRequest]
+    val responseMap = {
+      if (authorize(request.session, Alter, Resource.ClusterResource))
+        replicaManager.alterReplicaDir(alterReplicaDirRequest.partitionDirs.asScala)
+      else
+        alterReplicaDirRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+    }
+    sendResponseMaybeThrottle(request, requestThrottleMs => new AlterReplicaDirResponse(requestThrottleMs, responseMap.asJava))
+  }
+
+  def handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit = {
+    val describeLogDirsDirRequest = request.body[DescribeLogDirsRequest]
+    val logDirInfos = {
+      if (authorize(request.session, Describe, Resource.ClusterResource)) {
+        val partitions =
+          if (describeLogDirsDirRequest.isAllTopicPartitions)
+            replicaManager.logManager.allLogs().map(_.topicPartition).toSet
+          else
+            describeLogDirsDirRequest.topicPartitions().asScala
+
+        replicaManager.describeLogDirs(partitions)
+      } else {
+        Map.empty[String, LogDirInfo]
+      }
+    }
+    sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(throttleTimeMs, logDirInfos.asJava))
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index ec3abff..2689980 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -32,7 +32,7 @@ import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogConfig, LogManager}
 import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
-import kafka.network.{BlockingChannel, SocketServer}
+import kafka.network.SocketServer
 import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4a415e9..7920efe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -30,13 +30,14 @@ import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{KafkaStorageException, ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, LogDirNotFoundException, InvalidTimestampException, InvalidTopicException, KafkaStorageException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION
 import org.apache.kafka.common.protocol.Errors.KAFKA_STORAGE_ERROR
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
@@ -538,6 +539,92 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /*
+   * For each pair of partition and log directory specified in the map, record the pair in the memory so that the partition
+   * will be created in the specified log directory when broker receives LeaderAndIsrRequest for the partition later.
+   *
+   * This API is currently only useful if the replica has not been created yet. We will be able to move replicas
+   * that are already created to the user-specified log directory after KIP-113 is fully implemented
+   *
+   */
+  def alterReplicaDir(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors] = {
+    partitionDirs.map { case (topicPartition, destinationDir) =>
+      try {
+        if (!logManager.isLogDirOnline(destinationDir))
+          throw new KafkaStorageException(s"Log directory $destinationDir is offline")
+
+        // If the log for this partition has not been created yet:
+        // 1) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaDirResponse
+        // 2) Record the destination log directory in the memory so that the partition will be created in this log directory
+        //    when broker receives LeaderAndIsrRequest for this partition later.
+        getReplica(topicPartition) match {
+          case Some(_) => // The support for moving replica between log directories on the same broker is not available yet.
+          case None =>
+            logManager.updatePreferredLogDir(topicPartition, destinationDir)
+            throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition")
+        }
+
+        (topicPartition, Errors.NONE)
+      } catch {
+        case e@(_: LogDirNotFoundException |
+                _: ReplicaNotAvailableException |
+                _: KafkaStorageException) =>
+          (topicPartition, Errors.forException(e))
+        case t: Throwable =>
+          error("Error while changing replica dir for partition %s".format(topicPartition), t)
+          (topicPartition, Errors.forException(t))
+      }
+    }
+  }
+
+  /*
+   * Get the LogDirInfo for the specified list of partitions.
+   *
+   * Each LogDirInfo specifies the following information for a given log directory:
+   * 1) Error of the log directory, e.g. whether the log is online or offline
+   * 2) size and lag of current and future logs for each partition in the given log directory. Only logs of the queried partitions
+   *    are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented.
+   */
+  def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = {
+    val logsByDir = logManager.allLogs().groupBy(log => log.dir.getParent)
+
+    config.logDirs.toSet.map { logDir: String =>
+      val absolutePath = new File(logDir).getAbsolutePath
+      try {
+        if (!logManager.isLogDirOnline(absolutePath))
+          throw new KafkaStorageException(s"Log directory $absolutePath is offline")
+
+        logsByDir.get(absolutePath) match {
+          case Some(logs) =>
+            val replicaInfos = logs.filter(log =>
+              partitions.contains(log.topicPartition)
+            ).map(log => log.topicPartition -> new ReplicaInfo(log.size, getLogEndOffsetLag(log.topicPartition), false)).toMap
+
+            (absolutePath, new LogDirInfo(Errors.NONE, replicaInfos.asJava))
+          case None =>
+            (absolutePath, new LogDirInfo(Errors.NONE, Map.empty[TopicPartition, ReplicaInfo].asJava))
+        }
+
+      } catch {
+        case e: KafkaStorageException =>
+          (absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava))
+        case t: Throwable =>
+          error(s"Error while describing replica in dir $absolutePath", t)
+          (absolutePath, new LogDirInfo(Errors.forException(t), Map.empty[TopicPartition, ReplicaInfo].asJava))
+      }
+    }.toMap
+  }
+
+  def getLogEndOffsetLag(topicPartition: TopicPartition): Long = {
+    getReplica(topicPartition) match {
+      case Some(replica) =>
+          math.max(replica.highWatermark.messageOffset - replica.log.get.logEndOffset, 0)
+      case None =>
+        // return -1L to indicate that the LEO lag is not available if broker is neither follower or leader of this partition
+        DescribeLogDirsResponse.INVALID_OFFSET_LAG
+    }
+  }
+
   def deleteRecords(timeout: Long,
                     offsetPerPartition: Map[TopicPartition, Long],
                     responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index cb20b31..9582c50 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -166,8 +166,8 @@ object ZkUtils {
     DeleteTopicsPath + "/" + topic
 
   // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
-  def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = {
-    for {
+  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
+    val seq = for {
       js <- Json.parseFull(jsonData).toSeq
       partitionsSeq <- js.asJsonObject.get("partitions").toSeq
       p <- partitionsSeq.asJsonArray.iterator
@@ -178,11 +178,9 @@ object ZkUtils {
       val newReplicas = partitionFields("replicas").to[Seq[Int]]
       TopicAndPartition(topic, partition) -> newReplicas
     }
+    seq.toMap
   }
 
-  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] =
-    parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
-
   def parseTopicsData(jsonData: String): Seq[String] = {
     for {
       js <- Json.parseFull(jsonData).toSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 8bd379a..49a75b9 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -19,6 +19,7 @@ package kafka.api
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.io.File
 
 import org.apache.kafka.clients.admin.KafkaAdminClientTest
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -29,16 +30,17 @@ import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.KafkaFuture
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.{KafkaFuture, TopicPartition, TopicPartitionReplica}
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors._
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.MetadataResponse
 import org.apache.kafka.common.resource.{Resource, ResourceType}
 import org.junit.rules.Timeout
 import org.junit.Assert._
 
+import scala.util.Random
 import scala.collection.JavaConverters._
 
 /**
@@ -216,6 +218,77 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
+  def testDescribeLogDirs(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topic = "topic"
+    val leaderByPartition = TestUtils.createTopic(zkUtils, topic, 10, 1, servers, new Properties())
+    val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq)
+    val brokers = (0 until brokerCount).map(Integer.valueOf)
+    val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
+
+    (0 until brokerCount).foreach { brokerId =>
+      val server = servers.find(_.config.brokerId == brokerId).get
+      val expectedPartitions = partitionsByBroker(brokerId)
+      val logDirInfos = logDirInfosByBroker.get(brokerId)
+      val replicaInfos = logDirInfos.asScala.flatMap { case (logDir, logDirInfo) => logDirInfo.replicaInfos.asScala }.filterKeys(_.topic == topic)
+
+      assertEquals(expectedPartitions.toSet, replicaInfos.keys.map(_.partition).toSet)
+      logDirInfos.asScala.foreach { case (logDir, logDirInfo) =>
+        logDirInfo.replicaInfos.asScala.keys.foreach(tp =>
+          assertEquals(server.logManager.getLog(tp).get.dir.getParent, logDir)
+        )
+      }
+    }
+
+    client.close()
+  }
+
+  @Test
+  def testDescribeReplicaLogDir(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topic = "topic"
+    val leaderByPartition = TestUtils.createTopic(zkUtils, topic, 10, 1, servers, new Properties())
+    val replicas = leaderByPartition.map { case (partition, brokerId) => new TopicPartitionReplica(topic, partition, brokerId) }.toSeq
+
+    val replicaDirInfos = client.describeReplicaLogDir(replicas.asJavaCollection).all.get
+    replicaDirInfos.asScala.foreach { case (topicPartitionReplica, replicaDirInfo) =>
+      val server = servers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get
+      val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition())
+      assertEquals(server.logManager.getLog(tp).get.dir.getParent, replicaDirInfo.getCurrentReplicaLogDir)
+    }
+
+    client.close()
+  }
+
+  @Test
+  def testAlterReplicaLogDirBeforeTopicCreation(): Unit = {
+    val adminClient = AdminClient.create(createConfig())
+    val topic = "topic"
+    val tp = new TopicPartition(topic, 0)
+
+    val replicaAssignment = servers.map { server =>
+      val logDir = new File(server.config.logDirs(Random.nextInt(2))).getAbsolutePath
+      new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
+    }.toMap
+
+    adminClient.alterReplicaDir(replicaAssignment.asJava, new AlterReplicaDirOptions()).values().asScala.values.foreach { future =>
+      try {
+        future.get()
+        fail("Future should fail with ReplicaNotAvailableException")
+      } catch {
+        case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[ReplicaNotAvailableException])
+      }
+    }
+
+    TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties())
+    servers.foreach { server =>
+      val logDir = server.logManager.getLog(tp).get.dir.getParent
+      assertEquals(replicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
+    }
+    adminClient.close()
+  }
+
+  @Test
   def testDescribeAndAlterConfigs(): Unit = {
     client = AdminClient.create(createConfig)
 
@@ -366,7 +439,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
 
   override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2)
     cfgs.foreach { config =>
       config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
       config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1a4de99..67d15b3 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -64,6 +64,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val correlationId = 0
   val clientId = "client-Id"
   val tp = new TopicPartition(topic, part)
+  val logDir = "logDir"
   val topicAndPartition = TopicAndPartition(topic, part)
   val group = "my-group"
   val topicResource = new Resource(Topic, topic)
@@ -133,7 +134,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse],
       ApiKeys.CREATE_ACLS -> classOf[CreateAclsResponse],
       ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
-      ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse]
+      ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse],
+      ApiKeys.ALTER_REPLICA_DIR -> classOf[AlterReplicaDirResponse],
+      ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse]
   )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -167,7 +170,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)),
     ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => resp.aclCreationResponses.asScala.head.error.error),
     ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error),
-    ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error)
+    ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error),
+    ApiKeys.ALTER_REPLICA_DIR -> ((resp: AlterReplicaDirResponse) => resp.responses.get(tp)),
+    ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
+      if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED)
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -199,7 +205,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl),
     ApiKeys.CREATE_ACLS -> clusterAlterAcl,
     ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
-    ApiKeys.DELETE_ACLS -> clusterAlterAcl
+    ApiKeys.DELETE_ACLS -> clusterAlterAcl,
+    ApiKeys.ALTER_REPLICA_DIR -> clusterAlterAcl,
+    ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl
   )
 
   @Before
@@ -340,6 +348,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       new ResourceFilter(AdminResourceType.TOPIC, null),
       new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
 
+  private def alterReplicaDirRequest = new AlterReplicaDirRequest.Builder(Collections.singletonMap(tp, logDir)).build()
+
+  private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build()
+
 
   @Test
   def testAuthorizationWithTopicExisting() {
@@ -366,7 +378,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ALTER_CONFIGS -> alterConfigsRequest,
       ApiKeys.CREATE_ACLS -> createAclsRequest,
       ApiKeys.DELETE_ACLS -> deleteAclsRequest,
-      ApiKeys.DESCRIBE_ACLS -> describeAclsRequest
+      ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
+      ApiKeys.ALTER_REPLICA_DIR -> alterReplicaDirRequest,
+      ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index dd6c951..389cb8f 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -139,7 +139,7 @@ object ReplicationQuotasTestRig {
       val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
 
       val start = System.currentTimeMillis()
-      ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(config.throttle))
+      ReassignPartitionsCommand.executeAssignment(zkUtils, None, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(config.throttle))
 
       //Await completion
       waitForReassignmentToComplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 68e2c48..7f4eed7 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -203,7 +203,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(0, 2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
@@ -233,7 +233,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(1, 2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
@@ -262,7 +262,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
@@ -288,7 +288,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     val reassignedPartitions = zkUtils.getPartitionsBeingReassigned()
     assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
@@ -305,7 +305,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(0, 1)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     reassignPartitionsCommand.reassignPartitions()
     // create brokers
     servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 7df3693..5f76aa7 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -126,7 +126,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // reassign partition 0
     val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     val newReplicas = Seq(1, 2, 3)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(new TopicAndPartition(topicPartition) -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(new TopicAndPartition(topicPartition) -> newReplicas))
     assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {


Mime
View raw message