kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/2] kafka git commit: KAFKA-5995; Rename AlterReplicaDir to AlterReplicaDirs
Date Tue, 03 Oct 2017 05:24:15 GMT
KAFKA-5995; Rename AlterReplicaDir to AlterReplicaDirs

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3993 from lindong28/KAFKA-5995


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1a5e93b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1a5e93b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1a5e93b

Branch: refs/heads/trunk
Commit: a1a5e93beb16e38cf997554a7819a0b92c6661e5
Parents: e110e1c
Author: Dong Lin <lindong28@gmail.com>
Authored: Mon Oct 2 22:24:09 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Oct 2 22:24:09 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/AdminClient.java |  24 +--
 .../clients/admin/AlterReplicaDirOptions.java   |  29 ----
 .../clients/admin/AlterReplicaDirResult.java    |  57 -------
 .../admin/AlterReplicaLogDirsOptions.java       |  29 ++++
 .../admin/AlterReplicaLogDirsResult.java        |  57 +++++++
 .../admin/DescribeReplicaLogDirOptions.java     |  31 ----
 .../admin/DescribeReplicaLogDirResult.java      | 132 ---------------
 .../admin/DescribeReplicaLogDirsOptions.java    |  31 ++++
 .../admin/DescribeReplicaLogDirsResult.java     | 132 +++++++++++++++
 .../kafka/clients/admin/KafkaAdminClient.java   |  26 +--
 .../apache/kafka/common/protocol/ApiKeys.java   |   8 +-
 .../kafka/common/requests/AbstractRequest.java  |   4 +-
 .../kafka/common/requests/AbstractResponse.java |   4 +-
 .../common/requests/AlterReplicaDirRequest.java | 165 -------------------
 .../requests/AlterReplicaDirResponse.java       | 135 ---------------
 .../requests/AlterReplicaLogDirsRequest.java    | 165 +++++++++++++++++++
 .../requests/AlterReplicaLogDirsResponse.java   | 135 +++++++++++++++
 .../requests/DescribeLogDirsResponse.java       |   2 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |  14 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  12 +-
 .../scala/kafka/server/ReplicaManager.scala     |   4 +-
 .../kafka/api/AdminClientIntegrationTest.scala  |   8 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  10 +-
 .../admin/ReassignPartitionsClusterTest.scala   |   6 +-
 .../server/AlterReplicaDirRequestTest.scala     |  83 ----------
 .../server/AlterReplicaLogDirsRequestTest.scala |  83 ++++++++++
 .../unit/kafka/server/RequestQuotaTest.scala    |   6 +-
 27 files changed, 696 insertions(+), 696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 61d6db0..636317c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -373,16 +373,16 @@ public abstract class AdminClient implements AutoCloseable {
      * before the replica has been created on the broker. It will support moving replicas that have already been created after
      * KIP-113 is fully implemented.
      *
-     * This is a convenience method for #{@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)} with default options.
+     * This is a convenience method for #{@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
      * See the overload for more details.
      *
      * This operation is supported by brokers with version 1.0.0 or higher.
      *
      * @param replicaAssignment  The replicas with their log directory absolute path
-     * @return                   The AlterReplicaDirResult
+     * @return                   The AlterReplicaLogDirsResult
      */
-    public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment) {
-        return alterReplicaDir(replicaAssignment, new AlterReplicaDirOptions());
+    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
+        return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
     }
 
     /**
@@ -396,9 +396,9 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param replicaAssignment  The replicas with their log directory absolute path
      * @param options            The options to use when changing replica dir
-     * @return                   The AlterReplicaDirResult
+     * @return                   The AlterReplicaLogDirsResult
      */
-    public abstract AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaDirOptions options);
+    public abstract AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options);
 
     /**
      * Query the information of all log directories on the given set of brokers
@@ -429,16 +429,16 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Query the replica log directory information for the specified replicas.
      *
-     * This is a convenience method for #{@link AdminClient#describeReplicaLogDir(Collection, DescribeReplicaLogDirOptions)}
+     * This is a convenience method for #{@link AdminClient#describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
      * with default options. See the overload for more details.
      *
      * This operation is supported by brokers with version 1.0.0 or higher.
      *
      * @param replicas      The replicas to query
-     * @return              The DescribeReplicaLogDirResult
+     * @return              The DescribeReplicaLogDirsResult
      */
-    public DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas) {
-        return describeReplicaLogDir(replicas, new DescribeReplicaLogDirOptions());
+    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
+        return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
     }
 
     /**
@@ -448,9 +448,9 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param replicas      The replicas to query
      * @param options       The options to use when querying replica log dir info
-     * @return              The DescribeReplicaLogDirResult
+     * @return              The DescribeReplicaLogDirsResult
      */
-    public abstract DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options);
+    public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
 
     /**
      * Increase the number of partitions of the topics given as the keys of {@code newPartitions}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
deleted file mode 100644
index 68d2ab6..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Map;
-
-/**
- * Options for {@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)}.
- */
-@InterfaceStability.Evolving
-public class AlterReplicaDirOptions extends AbstractOptions<AlterReplicaDirOptions> {
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
deleted file mode 100644
index 55bf85b..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Map;
-
-
-/**
- * The result of {@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)}.
- */
-@InterfaceStability.Evolving
-public class AlterReplicaDirResult {
-    private final Map<TopicPartitionReplica, KafkaFuture<Void>> futures;
-
-    AlterReplicaDirResult(Map<TopicPartitionReplica, KafkaFuture<Void>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     *
-     * Return a map from replica to future which can be used to check the status of individual replica movement.
-     *
-     * Possible error code:
-     *
-     * LOG_DIR_NOT_FOUND (57)
-     * KAFKA_STORAGE_ERROR (56)
-     * REPLICA_NOT_AVAILABLE (9)
-     * UNKNOWN (-1)
-     */
-    public Map<TopicPartitionReplica, KafkaFuture<Void>> values() {
-        return futures;
-    }
-
-    /**
-     * Return a future which succeeds if all the replica movement have succeeded
-     */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
new file mode 100644
index 0000000..d6892ef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/**
+ * Options for {@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)}.
+ */
+@InterfaceStability.Evolving
+public class AlterReplicaLogDirsOptions extends AbstractOptions<AlterReplicaLogDirsOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
new file mode 100644
index 0000000..a3da216
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+
+/**
+ * The result of {@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)}.
+ */
+@InterfaceStability.Evolving
+public class AlterReplicaLogDirsResult {
+    private final Map<TopicPartitionReplica, KafkaFuture<Void>> futures;
+
+    AlterReplicaLogDirsResult(Map<TopicPartitionReplica, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     *
+     * Return a map from replica to future which can be used to check the status of individual replica movement.
+     *
+     * Possible error code:
+     *
+     * LOG_DIR_NOT_FOUND (57)
+     * KAFKA_STORAGE_ERROR (56)
+     * REPLICA_NOT_AVAILABLE (9)
+     * UNKNOWN (-1)
+     */
+    public Map<TopicPartitionReplica, KafkaFuture<Void>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if all the replica movement have succeeded
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
deleted file mode 100644
index 72d9643..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Collection;
-
-/**
- * Options for {@link AdminClient#describeReplicaLogDir(Collection<org.apache.kafka.common.TopicPartitionReplica>)}.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
- */
-@InterfaceStability.Evolving
-public class DescribeReplicaLogDirOptions extends AbstractOptions<DescribeReplicaLogDirOptions> {
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
deleted file mode 100644
index 6139cc7..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Collection;
-import java.util.concurrent.ExecutionException;
-
-
-/**
- * The result of {@link AdminClient#describeReplicaLogDir(Collection)}.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
- */
-@InterfaceStability.Evolving
-public class DescribeReplicaLogDirResult {
-    private final Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures;
-
-    DescribeReplicaLogDirResult(Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from replica to future which can be used to check the log directory information of individual replicas
-     */
-    public Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values() {
-        return futures;
-    }
-
-    /**
-     * Return a future which succeeds if log directory information of all replicas are available
-     */
-    public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
-                @Override
-                public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
-                    Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
-                    for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
-                        try {
-                            replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
-                        } catch (InterruptedException | ExecutionException e) {
-                            // This should be unreachable, because allOf ensured that all the futures completed successfully.
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    return replicaLogDirInfos;
-                }
-            });
-    }
-
-    static public class ReplicaLogDirInfo {
-        // The current log directory of the replica of this partition on the given broker.
-        // Null if no replica is not found for this partition on the given broker.
-        private final String currentReplicaLogDir;
-        // Defined as max(HW of partition - LEO of the replica, 0).
-        private final long currentReplicaOffsetLag;
-        // The future log directory of the replica of this partition on the given broker.
-        // Null if the replica of this partition is not being moved to another log directory on the given broker.
-        private final String futureReplicaLogDir;
-        // The LEO of the replica - LEO of the future log of this replica in the destination log directory.
-        // -1 if either there is not replica for this partition or the replica of this partition is not being moved to another log directory on the given broker.
-        private final long futureReplicaOffsetLag;
-
-        ReplicaLogDirInfo() {
-            this(null, DescribeLogDirsResponse.INVALID_OFFSET_LAG, null, DescribeLogDirsResponse.INVALID_OFFSET_LAG);
-        }
-
-        ReplicaLogDirInfo(String currentReplicaLogDir,
-                          long currentReplicaOffsetLag,
-                          String futureReplicaLogDir,
-                          long futureReplicaOffsetLag) {
-            this.currentReplicaLogDir = currentReplicaLogDir;
-            this.currentReplicaOffsetLag = currentReplicaOffsetLag;
-            this.futureReplicaLogDir = futureReplicaLogDir;
-            this.futureReplicaOffsetLag = futureReplicaOffsetLag;
-        }
-
-        public String getCurrentReplicaLogDir() {
-            return currentReplicaLogDir;
-        }
-
-        public long getCurrentReplicaOffsetLag() {
-            return currentReplicaOffsetLag;
-        }
-
-        public String getFutureReplicaLogDir() {
-            return futureReplicaLogDir;
-        }
-
-        public long getFutureReplicaOffsetLag() {
-            return futureReplicaOffsetLag;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            if (futureReplicaLogDir != null) {
-                builder.append("(currentReplicaLogDir=")
-                    .append(currentReplicaLogDir)
-                    .append(", futureReplicaLogDir=")
-                    .append(futureReplicaLogDir)
-                    .append(", futureReplicaOffsetLag=")
-                    .append(futureReplicaOffsetLag)
-                    .append(")");
-            } else {
-                builder.append("ReplicaLogDirInfo(currentReplicaLogDir=").append(currentReplicaLogDir).append(")");
-            }
-            return builder.toString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
new file mode 100644
index 0000000..943795c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Collection;
+
+/**
+ * Options for {@link AdminClient#describeReplicaLogDirs(Collection<org.apache.kafka.common.TopicPartitionReplica>)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeReplicaLogDirsOptions extends AbstractOptions<DescribeReplicaLogDirsOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
new file mode 100644
index 0000000..401b4aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * The result of {@link AdminClient#describeReplicaLogDirs(Collection)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeReplicaLogDirsResult {
+    private final Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures;
+
+    DescribeReplicaLogDirsResult(Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from replica to future which can be used to check the log directory information of individual replicas
+     */
+    public Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if log directory information of all replicas are available
+     */
+    public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
+                @Override
+                public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
+                    Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
+                    for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
+                        try {
+                            replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return replicaLogDirInfos;
+                }
+            });
+    }
+
+    static public class ReplicaLogDirInfo {
+        // The current log directory of the replica of this partition on the given broker.
+        // Null if no replica is not found for this partition on the given broker.
+        private final String currentReplicaLogDir;
+        // Defined as max(HW of partition - LEO of the replica, 0).
+        private final long currentReplicaOffsetLag;
+        // The future log directory of the replica of this partition on the given broker.
+        // Null if the replica of this partition is not being moved to another log directory on the given broker.
+        private final String futureReplicaLogDir;
+        // The LEO of the replica - LEO of the future log of this replica in the destination log directory.
+        // -1 if either there is not replica for this partition or the replica of this partition is not being moved to another log directory on the given broker.
+        private final long futureReplicaOffsetLag;
+
+        ReplicaLogDirInfo() {
+            this(null, DescribeLogDirsResponse.INVALID_OFFSET_LAG, null, DescribeLogDirsResponse.INVALID_OFFSET_LAG);
+        }
+
+        ReplicaLogDirInfo(String currentReplicaLogDir,
+                          long currentReplicaOffsetLag,
+                          String futureReplicaLogDir,
+                          long futureReplicaOffsetLag) {
+            this.currentReplicaLogDir = currentReplicaLogDir;
+            this.currentReplicaOffsetLag = currentReplicaOffsetLag;
+            this.futureReplicaLogDir = futureReplicaLogDir;
+            this.futureReplicaOffsetLag = futureReplicaOffsetLag;
+        }
+
+        public String getCurrentReplicaLogDir() {
+            return currentReplicaLogDir;
+        }
+
+        public long getCurrentReplicaOffsetLag() {
+            return currentReplicaOffsetLag;
+        }
+
+        public String getFutureReplicaLogDir() {
+            return futureReplicaLogDir;
+        }
+
+        public long getFutureReplicaOffsetLag() {
+            return futureReplicaOffsetLag;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            if (futureReplicaLogDir != null) {
+                builder.append("(currentReplicaLogDir=")
+                    .append(currentReplicaLogDir)
+                    .append(", futureReplicaLogDir=")
+                    .append(futureReplicaLogDir)
+                    .append(", futureReplicaOffsetLag=")
+                    .append(futureReplicaOffsetLag)
+                    .append(")");
+            } else {
+                builder.append("ReplicaLogDirInfo(currentReplicaLogDir=").append(currentReplicaLogDir).append(")");
+            }
+            return builder.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5f37b8e..1a66371 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
-import org.apache.kafka.clients.admin.DescribeReplicaLogDirResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -64,8 +64,8 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
-import org.apache.kafka.common.requests.AlterReplicaDirRequest;
-import org.apache.kafka.common.requests.AlterReplicaDirResponse;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.ApiError;
@@ -1653,7 +1653,7 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaDirOptions options) {
+    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options) {
         final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size());
 
         for (TopicPartitionReplica replica : replicaAssignment.keySet()) {
@@ -1677,17 +1677,17 @@ public class KafkaAdminClient extends AdminClient {
             final int brokerId = entry.getKey();
             final Map<TopicPartition, String> assignment = entry.getValue();
 
-            runnable.call(new Call("alterReplicaDir", calcDeadlineMs(now, options.timeoutMs()),
+            runnable.call(new Call("alterReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()),
                 new ConstantNodeIdProvider(brokerId)) {
 
                 @Override
                 public AbstractRequest.Builder createRequest(int timeoutMs) {
-                    return new AlterReplicaDirRequest.Builder(assignment);
+                    return new AlterReplicaLogDirsRequest.Builder(assignment);
                 }
 
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
-                    AlterReplicaDirResponse response = (AlterReplicaDirResponse) abstractResponse;
+                    AlterReplicaLogDirsResponse response = (AlterReplicaLogDirsResponse) abstractResponse;
                     for (Map.Entry<TopicPartition, Errors> responseEntry: response.responses().entrySet()) {
                         TopicPartition tp = responseEntry.getKey();
                         Errors error = responseEntry.getValue();
@@ -1710,7 +1710,7 @@ public class KafkaAdminClient extends AdminClient {
             }, now);
         }
 
-        return new AlterReplicaDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
+        return new AlterReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
     }
 
     @Override
@@ -1754,11 +1754,11 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options) {
-        final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size());
+    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options) {
+        final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size());
 
         for (TopicPartitionReplica replica : replicas) {
-            futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirResult.ReplicaLogDirInfo>());
+            futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>());
         }
 
         Map<Integer, Set<TopicPartition>> partitionsByBroker = new HashMap<>();
@@ -1777,7 +1777,7 @@ public class KafkaAdminClient extends AdminClient {
             for (TopicPartition topicPartition: topicPartitions)
                 replicaDirInfoByPartition.put(topicPartition, new ReplicaLogDirInfo());
 
-            runnable.call(new Call("describeReplicaLogDir", calcDeadlineMs(now, options.timeoutMs()),
+            runnable.call(new Call("describeReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()),
                 new ConstantNodeIdProvider(brokerId)) {
 
                 @Override
@@ -1834,7 +1834,7 @@ public class KafkaAdminClient extends AdminClient {
             }, now);
         }
 
-        return new DescribeReplicaLogDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
+        return new DescribeReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
     }
 
     public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index d094134..cf1bff5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
 import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
 import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
-import org.apache.kafka.common.requests.AlterReplicaDirRequest;
-import org.apache.kafka.common.requests.AlterReplicaDirResponse;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.ControlledShutdownRequest;
@@ -164,8 +164,8 @@ public enum ApiKeys {
             DescribeConfigsResponse.schemaVersions()),
     ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(),
             AlterConfigsResponse.schemaVersions()),
-    ALTER_REPLICA_DIR(34, "AlterReplicaDir", AlterReplicaDirRequest.schemaVersions(),
-            AlterReplicaDirResponse.schemaVersions()),
+    ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(),
+            AlterReplicaLogDirsResponse.schemaVersions()),
     DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(),
             DescribeLogDirsResponse.schemaVersions()),
     SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequest.schemaVersions(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index e6dd6da..5a1c4f4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -206,8 +206,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new DescribeConfigsRequest(struct, apiVersion);
             case ALTER_CONFIGS:
                 return new AlterConfigsRequest(struct, apiVersion);
-            case ALTER_REPLICA_DIR:
-                return new AlterReplicaDirRequest(struct, apiVersion);
+            case ALTER_REPLICA_LOG_DIRS:
+                return new AlterReplicaLogDirsRequest(struct, apiVersion);
             case DESCRIBE_LOG_DIRS:
                 return new DescribeLogDirsRequest(struct, apiVersion);
             case SASL_AUTHENTICATE:

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 12fe3c8..6294af4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -138,8 +138,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DescribeConfigsResponse(struct);
             case ALTER_CONFIGS:
                 return new AlterConfigsResponse(struct);
-            case ALTER_REPLICA_DIR:
-                return new AlterReplicaDirResponse(struct);
+            case ALTER_REPLICA_LOG_DIRS:
+                return new AlterReplicaLogDirsResponse(struct);
             case DESCRIBE_LOG_DIRS:
                 return new DescribeLogDirsResponse(struct);
             case SASL_AUTHENTICATE:

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
deleted file mode 100644
index 7e58fd6..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
-public class AlterReplicaDirRequest extends AbstractRequest {
-
-    // request level key names
-    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
-
-    // log dir level key names
-    private static final String LOG_DIR_KEY_NAME = "log_dir";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level key names
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ALTER_REPLICA_DIR_REQUEST_V0 = new Schema(
-            new Field("log_dirs", new ArrayOf(new Schema(
-                    new Field("log_dir", STRING, "The absolute log directory path."),
-                    new Field("topics", new ArrayOf(new Schema(
-                            TOPIC_NAME,
-                            new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))))));
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_DIR_REQUEST_V0};
-    }
-
-    private final Map<TopicPartition, String> partitionDirs;
-
-    public static class Builder extends AbstractRequest.Builder<AlterReplicaDirRequest> {
-        private final Map<TopicPartition, String> partitionDirs;
-
-        public Builder(Map<TopicPartition, String> partitionDirs) {
-            super(ApiKeys.ALTER_REPLICA_DIR);
-            this.partitionDirs = partitionDirs;
-        }
-
-        @Override
-        public AlterReplicaDirRequest build(short version) {
-            return new AlterReplicaDirRequest(partitionDirs, version);
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append("(type=AlterReplicaDirRequest")
-                .append(", partitionDirs=")
-                .append(partitionDirs)
-                .append(")");
-            return builder.toString();
-        }
-    }
-
-    public AlterReplicaDirRequest(Struct struct, short version) {
-        super(version);
-        partitionDirs = new HashMap<>();
-        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
-            Struct logDirStruct = (Struct) logDirStructObj;
-            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
-            for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
-                Struct topicStruct = (Struct) topicStructObj;
-                String topic = topicStruct.get(TOPIC_NAME);
-                for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
-                    int partition = (Integer) partitionObj;
-                    partitionDirs.put(new TopicPartition(topic, partition), logDir);
-                }
-            }
-        }
-    }
-
-    public AlterReplicaDirRequest(Map<TopicPartition, String> partitionDirs, short version) {
-        super(version);
-        this.partitionDirs = partitionDirs;
-    }
-
-    @Override
-    protected Struct toStruct() {
-        Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
-        for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) {
-            if (!dirPartitions.containsKey(entry.getValue()))
-                dirPartitions.put(entry.getValue(), new ArrayList<TopicPartition>());
-            dirPartitions.get(entry.getValue()).add(entry.getKey());
-        }
-
-        Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.requestSchema(version()));
-        List<Struct> logDirStructArray = new ArrayList<>();
-        for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) {
-            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
-            logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
-
-            List<Struct> topicStructArray = new ArrayList<>();
-            for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) {
-                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
-                topicStruct.set(TOPIC_NAME, topicEntry.getKey());
-                topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-                topicStructArray.add(topicStruct);
-            }
-            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
-            logDirStructArray.add(logDirStruct);
-        }
-        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
-        return struct;
-    }
-
-    @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<TopicPartition, Errors> responseMap = new HashMap<>();
-
-        for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) {
-            responseMap.put(entry.getKey(), Errors.forException(e));
-        }
-
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-                return new AlterReplicaDirResponse(throttleTimeMs, responseMap);
-            default:
-                throw new IllegalArgumentException(
-                    String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
-                        this.getClass().getSimpleName(), ApiKeys.ALTER_REPLICA_DIR.latestVersion()));
-        }
-    }
-
-    public Map<TopicPartition, String> partitionDirs() {
-        return partitionDirs;
-    }
-
-    public static AlterReplicaDirRequest parse(ByteBuffer buffer, short version) {
-        return new AlterReplicaDirRequest(ApiKeys.ALTER_REPLICA_DIR.parseRequest(version, buffer), version);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
deleted file mode 100644
index b875104..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-
-
-public class AlterReplicaDirResponse extends AbstractResponse {
-
-    // request level key names
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level key names
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
-                            PARTITION_ID,
-                            ERROR_CODE)))))));
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_DIR_RESPONSE_V0};
-    }
-
-    /**
-     * Possible error code:
-     *
-     * LOG_DIR_NOT_FOUND (57)
-     * KAFKA_STORAGE_ERROR (56)
-     * REPLICA_NOT_AVAILABLE (9)
-     * UNKNOWN (-1)
-     */
-    private final Map<TopicPartition, Errors> responses;
-    private final int throttleTimeMs;
-
-    public AlterReplicaDirResponse(Struct struct) {
-        throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-        responses = new HashMap<>();
-        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicStruct = (Struct) topicStructObj;
-            String topic = topicStruct.get(TOPIC_NAME);
-            for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionStruct = (Struct) partitionStructObj;
-                int partition = partitionStruct.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
-                responses.put(new TopicPartition(topic, partition), error);
-            }
-        }
-    }
-
-    /**
-     * Constructor for version 0.
-     */
-    public AlterReplicaDirResponse(int throttleTimeMs, Map<TopicPartition, Errors> responses) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.responses = responses;
-    }
-
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
-        Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
-        List<Struct> topicStructArray = new ArrayList<>();
-        for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) {
-            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
-            topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey());
-            List<Struct> partitionStructArray = new ArrayList<>();
-            for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
-                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
-                Errors response = responsesByPartitionEntry.getValue();
-                partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey());
-                partitionStruct.set(ERROR_CODE, response.code());
-                partitionStructArray.add(partitionStruct);
-            }
-            topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
-            topicStructArray.add(topicStruct);
-        }
-        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
-        return struct;
-    }
-
-    public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public Map<TopicPartition, Errors> responses() {
-        return this.responses;
-    }
-
-    @Override
-    public Map<Errors, Integer> errorCounts() {
-        return errorCounts(responses);
-    }
-
-    public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) {
-        return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
new file mode 100644
index 0000000..ba21759
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
+public class AlterReplicaLogDirsRequest extends AbstractRequest {
+
+    // request level key names
+    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
+
+    // log dir level key names
+    private static final String LOG_DIR_KEY_NAME = "log_dir";
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V0 = new Schema(
+            new Field("log_dirs", new ArrayOf(new Schema(
+                    new Field("log_dir", STRING, "The absolute log directory path."),
+                    new Field("topics", new ArrayOf(new Schema(
+                            TOPIC_NAME,
+                            new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0};
+    }
+
+    private final Map<TopicPartition, String> partitionDirs;
+
+    public static class Builder extends AbstractRequest.Builder<AlterReplicaLogDirsRequest> {
+        private final Map<TopicPartition, String> partitionDirs;
+
+        public Builder(Map<TopicPartition, String> partitionDirs) {
+            super(ApiKeys.ALTER_REPLICA_LOG_DIRS);
+            this.partitionDirs = partitionDirs;
+        }
+
+        @Override
+        public AlterReplicaLogDirsRequest build(short version) {
+            return new AlterReplicaLogDirsRequest(partitionDirs, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(type=AlterReplicaLogDirsRequest")
+                .append(", partitionDirs=")
+                .append(partitionDirs)
+                .append(")");
+            return builder.toString();
+        }
+    }
+
+    public AlterReplicaLogDirsRequest(Struct struct, short version) {
+        super(version);
+        partitionDirs = new HashMap<>();
+        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
+            Struct logDirStruct = (Struct) logDirStructObj;
+            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
+            for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
+                Struct topicStruct = (Struct) topicStructObj;
+                String topic = topicStruct.get(TOPIC_NAME);
+                for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                    int partition = (Integer) partitionObj;
+                    partitionDirs.put(new TopicPartition(topic, partition), logDir);
+                }
+            }
+        }
+    }
+
+    public AlterReplicaLogDirsRequest(Map<TopicPartition, String> partitionDirs, short version) {
+        super(version);
+        this.partitionDirs = partitionDirs;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
+        for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) {
+            if (!dirPartitions.containsKey(entry.getValue()))
+                dirPartitions.put(entry.getValue(), new ArrayList<TopicPartition>());
+            dirPartitions.get(entry.getValue()).add(entry.getKey());
+        }
+
+        Struct struct = new Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.requestSchema(version()));
+        List<Struct> logDirStructArray = new ArrayList<>();
+        for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) {
+            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
+            logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
+
+            List<Struct> topicStructArray = new ArrayList<>();
+            for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) {
+                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
+                topicStruct.set(TOPIC_NAME, topicEntry.getKey());
+                topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
+                topicStructArray.add(topicStruct);
+            }
+            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+            logDirStructArray.add(logDirStruct);
+        }
+        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Map<TopicPartition, Errors> responseMap = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) {
+            responseMap.put(entry.getKey(), Errors.forException(e));
+        }
+
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap);
+            default:
+                throw new IllegalArgumentException(
+                    String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
+                        this.getClass().getSimpleName(), ApiKeys.ALTER_REPLICA_LOG_DIRS.latestVersion()));
+        }
+    }
+
+    public Map<TopicPartition, String> partitionDirs() {
+        return partitionDirs;
+    }
+
+    public static AlterReplicaLogDirsRequest parse(ByteBuffer buffer, short version) {
+        return new AlterReplicaLogDirsRequest(ApiKeys.ALTER_REPLICA_LOG_DIRS.parseRequest(version, buffer), version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
new file mode 100644
index 0000000..f8d1546
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
+
+public class AlterReplicaLogDirsResponse extends AbstractResponse {
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
+                            PARTITION_ID,
+                            ERROR_CODE)))))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0};
+    }
+
+    /**
+     * Possible error code:
+     *
+     * LOG_DIR_NOT_FOUND (57)
+     * KAFKA_STORAGE_ERROR (56)
+     * REPLICA_NOT_AVAILABLE (9)
+     * UNKNOWN (-1)
+     */
+    private final Map<TopicPartition, Errors> responses;
+    private final int throttleTimeMs;
+
+    public AlterReplicaLogDirsResponse(Struct struct) {
+        throttleTimeMs = struct.get(THROTTLE_TIME_MS);
+        responses = new HashMap<>();
+        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicStruct = (Struct) topicStructObj;
+            String topic = topicStruct.get(TOPIC_NAME);
+            for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionStructObj;
+                int partition = partitionStruct.get(PARTITION_ID);
+                Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
+                responses.put(new TopicPartition(topic, partition), error);
+            }
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     */
+    public AlterReplicaLogDirsResponse(int throttleTimeMs, Map<TopicPartition, Errors> responses) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.responses = responses;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.responseSchema(version));
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey());
+            List<Struct> partitionStructArray = new ArrayList<>();
+            for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
+                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                Errors response = responsesByPartitionEntry.getValue();
+                partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey());
+                partitionStruct.set(ERROR_CODE, response.code());
+                partitionStructArray.add(partitionStruct);
+            }
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Map<TopicPartition, Errors> responses() {
+        return this.responses;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(responses);
+    }
+
+    public static AlterReplicaLogDirsResponse parse(ByteBuffer buffer, short version) {
+        return new AlterReplicaLogDirsResponse(ApiKeys.ALTER_REPLICA_LOG_DIRS.responseSchema(version).read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index 6613dfe..a242240 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -74,7 +74,7 @@ public class DescribeLogDirsResponse extends AbstractResponse {
                                             "(if it is the current log for the partition) or current replica's LEO " +
                                             "(if it is the future log for the partition)"),
                                     new Field(IS_FUTURE_KEY_NAME, BOOLEAN, "True if this log is created by " +
-                                            "AlterReplicaDirRequest and will replace the current log of the replica " +
+                                            "AlterReplicaLogDirsRequest and will replace the current log of the replica " +
                                             "in the future.")))))))))));
 
     public static Schema[] schemaVersions() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 5dbcfcf..af81697 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -30,10 +30,10 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.TopicPartitionReplica
 import org.apache.kafka.common.errors.{LogDirNotFoundException, ReplicaNotAvailableException}
-import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaDirOptions, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, DescribeReplicaLogDirsResult, AdminClient => JAdminClient}
 import LogConfig._
 import joptsimple.OptionParser
-import org.apache.kafka.clients.admin.DescribeReplicaLogDirResult.ReplicaLogDirInfo
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
 
 object ReassignPartitionsCommand extends Logging {
 
@@ -320,7 +320,7 @@ object ReassignPartitionsCommand extends Logging {
       if (replicaAssignment.nonEmpty) {
         val adminClient = adminClientOpt.getOrElse(
           throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
-        adminClient.describeReplicaLogDir(replicaAssignment.keySet.asJava).all().get().asScala
+        adminClient.describeReplicaLogDirs(replicaAssignment.keySet.asJava).all().get().asScala
       } else {
         Map.empty[TopicPartitionReplica, ReplicaLogDirInfo]
       }
@@ -551,14 +551,14 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
       if (validPartitions.isEmpty) false
       else {
         if (proposedReplicaAssignment.nonEmpty) {
-          // Send AlterReplicaDirRequest to allow broker to create replica in the right log dir later if the replica
+          // Send AlterReplicaLogDirsRequest to allow broker to create replica in the right log dir later if the replica
           // has not been created it. This allows us to rebalance load across log directories in the cluster even if
           // we can not move replicas between log directories on the same broker. We will be able to move replicas
           // between log directories on the same broker after KIP-113 is implemented.
           val adminClient = adminClientOpt.getOrElse(
             throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
-          val alterReplicaDirResult = adminClient.alterReplicaDir(
-            proposedReplicaAssignment.asJava, new AlterReplicaDirOptions().timeoutMs(timeoutMs.toInt))
+          val alterReplicaDirResult = adminClient.alterReplicaLogDirs(
+            proposedReplicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt))
           alterReplicaDirResult.values().asScala.foreach { case (replica, future) => {
               try {
                 /*
@@ -568,7 +568,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
                  * for this replica.
                  *
                  * After KIP-113 is fully implemented, we will not need to verify that the broker returns this ReplicaNotAvailableException
-                 * in this step. And after the reassignment znode is created, we will need to re-send AlterReplicaDirRequest to broker
+                 * in this step. And after the reassignment znode is created, we will need to re-send AlterReplicaLogDirsRequest to broker
                  * if broker returns ReplicaNotAvailableException for any replica in the request.
                  */
                 future.get()

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6e02164..13959b8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -130,7 +130,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
         case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
         case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
-        case ApiKeys.ALTER_REPLICA_DIR => handleAlterReplicaDirRequest(request)
+        case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
         case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
         case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
@@ -1966,15 +1966,15 @@ class KafkaApis(val requestChannel: RequestChannel,
       new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ unauthorizedConfigs).asJava))
   }
 
-  def handleAlterReplicaDirRequest(request: RequestChannel.Request): Unit = {
-    val alterReplicaDirRequest = request.body[AlterReplicaDirRequest]
+  def handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit = {
+    val alterReplicaDirsRequest = request.body[AlterReplicaLogDirsRequest]
     val responseMap = {
       if (authorize(request.session, Alter, Resource.ClusterResource))
-        replicaManager.alterReplicaDir(alterReplicaDirRequest.partitionDirs.asScala)
+        replicaManager.alterReplicaLogDirs(alterReplicaDirsRequest.partitionDirs.asScala)
       else
-        alterReplicaDirRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+        alterReplicaDirsRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
     }
-    sendResponseMaybeThrottle(request, requestThrottleMs => new AlterReplicaDirResponse(requestThrottleMs, responseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new AlterReplicaLogDirsResponse(requestThrottleMs, responseMap.asJava))
   }
 
   def handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b5a93b0..a361e16 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -569,14 +569,14 @@ class ReplicaManager(val config: KafkaConfig,
    * that are already created to the user-specified log directory after KIP-113 is fully implemented
    *
    */
-  def alterReplicaDir(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors] = {
+  def alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors] = {
     partitionDirs.map { case (topicPartition, destinationDir) =>
       try {
         if (!logManager.isLogDirOnline(destinationDir))
           throw new KafkaStorageException(s"Log directory $destinationDir is offline")
 
         // If the log for this partition has not been created yet:
-        // 1) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaDirResponse
+        // 1) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaLogDirsResponse
         // 2) Record the destination log directory in the memory so that the partition will be created in this log directory
         //    when broker receives LeaderAndIsrRequest for this partition later.
         getReplica(topicPartition) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index da818d6..e916efa 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -245,13 +245,13 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
-  def testDescribeReplicaLogDir(): Unit = {
+  def testDescribeReplicaLogDirs(): Unit = {
     client = AdminClient.create(createConfig())
     val topic = "topic"
     val leaderByPartition = TestUtils.createTopic(zkUtils, topic, 10, 1, servers, new Properties())
     val replicas = leaderByPartition.map { case (partition, brokerId) => new TopicPartitionReplica(topic, partition, brokerId) }.toSeq
 
-    val replicaDirInfos = client.describeReplicaLogDir(replicas.asJavaCollection).all.get
+    val replicaDirInfos = client.describeReplicaLogDirs(replicas.asJavaCollection).all.get
     replicaDirInfos.asScala.foreach { case (topicPartitionReplica, replicaDirInfo) =>
       val server = servers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get
       val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition())
@@ -262,7 +262,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
-  def testAlterReplicaLogDirBeforeTopicCreation(): Unit = {
+  def testAlterReplicaLogDirsBeforeTopicCreation(): Unit = {
     val adminClient = AdminClient.create(createConfig())
     val topic = "topic"
     val tp = new TopicPartition(topic, 0)
@@ -272,7 +272,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
       new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
     }.toMap
 
-    adminClient.alterReplicaDir(replicaAssignment.asJava, new AlterReplicaDirOptions()).values().asScala.values.foreach { future =>
+    adminClient.alterReplicaLogDirs(replicaAssignment.asJava, new AlterReplicaLogDirsOptions()).values().asScala.values.foreach { future =>
       try {
         future.get()
         fail("Future should fail with ReplicaNotAvailableException")

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 013315a..18a73b9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -142,7 +142,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.CREATE_ACLS -> classOf[CreateAclsResponse],
       ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
       ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse],
-      ApiKeys.ALTER_REPLICA_DIR -> classOf[AlterReplicaDirResponse],
+      ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
       ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse]
   )
 
@@ -180,7 +180,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => resp.aclCreationResponses.asScala.head.error.error),
     ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error),
     ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error),
-    ApiKeys.ALTER_REPLICA_DIR -> ((resp: AlterReplicaDirResponse) => resp.responses.get(tp)),
+    ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => resp.responses.get(tp)),
     ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
       if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED)
   )
@@ -217,7 +217,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.CREATE_ACLS -> clusterAlterAcl,
     ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
     ApiKeys.DELETE_ACLS -> clusterAlterAcl,
-    ApiKeys.ALTER_REPLICA_DIR -> clusterAlterAcl,
+    ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
     ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl
   )
 
@@ -366,7 +366,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       new ResourceFilter(AdminResourceType.TOPIC, null),
       new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
 
-  private def alterReplicaDirRequest = new AlterReplicaDirRequest.Builder(Collections.singletonMap(tp, logDir)).build()
+  private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
 
   private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build()
 
@@ -399,7 +399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.CREATE_ACLS -> createAclsRequest,
       ApiKeys.DELETE_ACLS -> deleteAclsRequest,
       ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
-      ApiKeys.ALTER_REPLICA_DIR -> alterReplicaDirRequest,
+      ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest,
       ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest
     )
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index ce16971..7002e84 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -97,7 +97,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     assertEquals(Seq(101), zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition))
     // The replica should be in the expected log directory on broker 101
     val replica = new TopicPartitionReplica(topicName, 0, 101)
-    assertEquals(expectedLogDir, adminClient.describeReplicaLogDir(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
+    assertEquals(expectedLogDir, adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
   }
 
   @Test
@@ -128,7 +128,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
     assertEquals(Seq(100, 101, 102), actual.values.flatten.toSeq.distinct.sorted)
     // The replica should be in the expected log directory on broker 102
-    assertEquals(expectedLogDir, adminClient.describeReplicaLogDir(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
+    assertEquals(expectedLogDir, adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
   }
 
   @Test
@@ -199,7 +199,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     assertEquals(Seq(100, 102), actual("topic2")(2))//changed
 
     // The replicas should be in the expected log directories
-    val replicaDirs = adminClient.describeReplicaLogDir(List(replica1, replica2).asJavaCollection).all().get()
+    val replicaDirs = adminClient.describeReplicaLogDirs(List(replica1, replica2).asJavaCollection).all().get()
     assertEquals(proposedReplicaAssignment(replica1), replicaDirs.get(replica1).getCurrentReplicaLogDir)
     assertEquals(proposedReplicaAssignment(replica2), replicaDirs.get(replica2).getCurrentReplicaLogDir)
   }


Mime
View raw message