kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8345: KIP-455 Protocol changes (part 1) (#7114)
Date Mon, 29 Jul 2019 21:36:29 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81900d0  KAFKA-8345: KIP-455 Protocol changes (part 1) (#7114)
81900d0 is described below

commit 81900d0ba0a5839a1e5dc876897bab1c24b3bd94
Author: Stanislav Kozlovski <familyguyuser192@windowslive.com>
AuthorDate: Mon Jul 29 22:35:55 2019 +0100

    KAFKA-8345: KIP-455 Protocol changes (part 1) (#7114)
    
    Add a new exception, NoReassignmentInProgressException.  Modify LeaderAndIsrRequest to include the AddingRepicas and RemovingReplicas fields.  Add the ListPartitionReassignments and AlterPartitionReassignments RPCs.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Viktor Somogyi <viktorsomogyi@gmail.com>
---
 .../errors/NoReassignmentInProgressException.java  |  31 ++++++
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  10 +-
 .../org/apache/kafka/common/protocol/Errors.java   |   5 +-
 .../kafka/common/requests/AbstractRequest.java     |   4 +
 .../kafka/common/requests/AbstractResponse.java    |   4 +
 .../AlterPartitionReassignmentsRequest.java        | 114 +++++++++++++++++++++
 .../AlterPartitionReassignmentsResponse.java       |  84 +++++++++++++++
 .../kafka/common/requests/LeaderAndIsrRequest.java |  86 +++++++++++++++-
 .../common/requests/LeaderAndIsrResponse.java      |   4 +-
 .../ListPartitionReassignmentsRequest.java         | 104 +++++++++++++++++++
 .../ListPartitionReassignmentsResponse.java        |  75 ++++++++++++++
 .../AlterPartitionReassignmentsRequest.json        |  37 +++++++
 .../AlterPartitionReassignmentsResponse.json       |  43 ++++++++
 .../common/message/LeaderAndIsrRequest.json        |   8 +-
 .../common/message/LeaderAndIsrResponse.json       |   4 +-
 .../message/ListPartitionReassignmentsRequest.json |  32 ++++++
 .../ListPartitionReassignmentsResponse.json        |  45 ++++++++
 .../apache/kafka/common/message/MessageTest.java   |  43 +++++++-
 .../kafka/common/requests/RequestResponseTest.java |  67 ++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   |  30 ++++++
 .../kafka/api/AuthorizerIntegrationTest.scala      |  38 ++++++-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  14 +++
 22 files changed, 867 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NoReassignmentInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/NoReassignmentInProgressException.java
new file mode 100644
index 0000000..9fd8a73
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NoReassignmentInProgressException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown if a reassignment cannot be cancelled because none is in progress.
+ */
+public class NoReassignmentInProgressException extends ApiException {
+    public NoReassignmentInProgressException(String message) {
+        super(message);
+    }
+
+    public NoReassignmentInProgressException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 171a3cd..18d8fd6 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -44,6 +44,10 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
@@ -193,7 +197,11 @@ public enum ApiKeys {
     ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
             ElectLeadersResponseData.SCHEMAS),
     INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
-                              IncrementalAlterConfigsResponseData.SCHEMAS);
+                              IncrementalAlterConfigsResponseData.SCHEMAS),
+    ALTER_PARTITION_REASSIGNMENTS(45, "AlterPartitionReassignments", AlterPartitionReassignmentsRequestData.SCHEMAS,
+                                  AlterPartitionReassignmentsResponseData.SCHEMAS),
+    LIST_PARTITION_REASSIGNMENTS(46, "ListPartitionReassignments", ListPartitionReassignmentsRequestData.SCHEMAS,
+                                 ListPartitionReassignmentsResponseData.SCHEMAS);
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 89bc051..9f11fc8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -65,6 +65,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException;
 import org.apache.kafka.common.errors.ElectionNotNeededException;
 import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.NoReassignmentInProgressException;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
@@ -309,7 +310,9 @@ public enum Errors {
             FencedInstanceIdException::new),
     ELIGIBLE_LEADERS_NOT_AVAILABLE(83, "Eligible topic partition leaders are not available",
             EligibleLeadersNotAvailableException::new),
-    ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new);
+    ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new),
+    NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.",
+            NoReassignmentInProgressException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index c8ff90d..58bf128 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -233,6 +233,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new ElectLeadersRequest(struct, apiVersion);
             case INCREMENTAL_ALTER_CONFIGS:
                 return new IncrementalAlterConfigsRequest(struct, apiVersion);
+            case ALTER_PARTITION_REASSIGNMENTS:
+                return new AlterPartitionReassignmentsRequest(struct, apiVersion);
+            case LIST_PARTITION_REASSIGNMENTS:
+                return new ListPartitionReassignmentsRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 9eddf66..2e433e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -160,6 +160,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new ElectLeadersResponse(struct, version);
             case INCREMENTAL_ALTER_CONFIGS:
                 return new IncrementalAlterConfigsResponse(struct, version);
+            case ALTER_PARTITION_REASSIGNMENTS:
+                return new AlterPartitionReassignmentsResponse(struct, version);
+            case LIST_PARTITION_REASSIGNMENTS:
+                return new ListPartitionReassignmentsResponse(struct, version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
new file mode 100644
index 0000000..7b2f848
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class AlterPartitionReassignmentsRequest extends AbstractRequest {
+
+    public static class Builder extends AbstractRequest.Builder<AlterPartitionReassignmentsRequest> {
+        private final AlterPartitionReassignmentsRequestData data;
+
+        public Builder(AlterPartitionReassignmentsRequestData data) {
+            super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS);
+            this.data = data;
+        }
+
+        @Override
+        public AlterPartitionReassignmentsRequest build(short version) {
+            return new AlterPartitionReassignmentsRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final AlterPartitionReassignmentsRequestData data;
+    private final short version;
+
+    private AlterPartitionReassignmentsRequest(AlterPartitionReassignmentsRequestData data, short version) {
+        super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, version);
+        this.data = data;
+        this.version = version;
+    }
+
+    AlterPartitionReassignmentsRequest(Struct struct, short version) {
+        super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, version);
+        this.data = new AlterPartitionReassignmentsRequestData(struct, version);
+        this.version = version;
+    }
+
+    public static AlterPartitionReassignmentsRequest parse(ByteBuffer buffer, short version) {
+        return new AlterPartitionReassignmentsRequest(
+                ApiKeys.ALTER_PARTITION_REASSIGNMENTS.parseRequest(version, buffer),
+                version
+        );
+    }
+
+    public AlterPartitionReassignmentsRequestData data() {
+        return data;
+    }
+
+    /**
+     * Visible for testing.
+     */
+    @Override
+    public Struct toStruct() {
+        return data.toStruct(version);
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        ApiError apiError = ApiError.fromThrowable(e);
+        List<ReassignableTopicResponse> topicResponses = new ArrayList<>();
+
+        for (ReassignableTopic topic : data.topics()) {
+            List<ReassignablePartitionResponse> partitionResponses = topic.partitions().stream().map(partition ->
+                    new ReassignablePartitionResponse()
+                            .setPartitionIndex(partition.partitionIndex())
+                            .setErrorCode(apiError.error().code())
+                            .setErrorMessage(apiError.message())
+            ).collect(Collectors.toList());
+            topicResponses.add(
+                    new ReassignableTopicResponse()
+                            .setName(topic.name())
+                            .setPartitions(partitionResponses)
+            );
+        }
+
+        AlterPartitionReassignmentsResponseData responseData = new AlterPartitionReassignmentsResponseData()
+                .setResponses(topicResponses)
+                .setErrorCode(apiError.error().code())
+                .setErrorMessage(apiError.message())
+                .setThrottleTimeMs(throttleTimeMs);
+        return new AlterPartitionReassignmentsResponse(responseData);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
new file mode 100644
index 0000000..db1cfab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AlterPartitionReassignmentsResponse extends AbstractResponse {
+
+    private final AlterPartitionReassignmentsResponseData data;
+
+    public AlterPartitionReassignmentsResponse(Struct struct) {
+        this(struct, ApiKeys.ALTER_PARTITION_REASSIGNMENTS.latestVersion());
+    }
+
+    AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) {
+        this.data = data;
+    }
+
+    AlterPartitionReassignmentsResponse(Struct struct, short version) {
+        this.data = new AlterPartitionReassignmentsResponseData(struct, version);
+    }
+
+    public static AlterPartitionReassignmentsResponse parse(ByteBuffer buffer, short version) {
+        return new AlterPartitionReassignmentsResponse(ApiKeys.ALTER_PARTITION_REASSIGNMENTS.responseSchema(version).read(buffer), version);
+    }
+
+    public AlterPartitionReassignmentsResponseData data() {
+        return data;
+    }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return true;
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> counts = new HashMap<>();
+        Errors topLevelErr = Errors.forCode(data.errorCode());
+        counts.put(topLevelErr, counts.getOrDefault(topLevelErr, 0) + 1);
+
+        for (ReassignableTopicResponse topicResponse : data.responses()) {
+            for (ReassignablePartitionResponse partitionResponse : topicResponse.partitions()) {
+                Errors error = Errors.forCode(partitionResponse.errorCode());
+                counts.put(error, counts.getOrDefault(error, 0) + 1);
+            }
+        }
+        return counts;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 230c807..02c14e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Collections;
 
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
@@ -49,6 +50,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
     private static final Field.Array ISR = new Field.Array("isr", INT32, "The in sync replica ids.");
     private static final Field.Int32 ZK_VERSION = new Field.Int32("zk_version", "The ZK version.");
     private static final Field.Array REPLICAS = new Field.Array("replicas", INT32, "The replica ids.");
+    private static final Field.Array ADDING_REPLICAS = new Field.Array("adding_replicas", INT32,
+            "The replica ids we are in the process of adding to the replica set during a reassignment.");
+    private static final Field.Array REMOVING_REPLICAS = new Field.Array("removing_replicas", INT32,
+            "The replica ids we are in the process of removing from the replica set during a reassignment.");
     private static final Field.Bool IS_NEW = new Field.Bool("is_new", "Whether the replica should have existed on the broker or not");
 
     // live_leaders fields
@@ -89,11 +94,28 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
             REPLICAS,
             IS_NEW);
 
+    private static final Field PARTITION_STATES_V3 = PARTITION_STATES.withFields(
+            PARTITION_ID,
+            CONTROLLER_EPOCH,
+            LEADER,
+            LEADER_EPOCH,
+            ISR,
+            ZK_VERSION,
+            REPLICAS,
+            ADDING_REPLICAS,
+            REMOVING_REPLICAS,
+            IS_NEW);
+
     // TOPIC_STATES_V2 normalizes TOPIC_STATES_V1 to make it more memory efficient
     private static final Field TOPIC_STATES_V2 = TOPIC_STATES.withFields(
             TOPIC_NAME,
             PARTITION_STATES_V2);
 
+    // TOPIC_STATES_V3 adds two new fields - adding_replicas and removing_replicas
+    private static final Field TOPIC_STATES_V3 = TOPIC_STATES.withFields(
+            TOPIC_NAME,
+            PARTITION_STATES_V3);
+
     private static final Field LIVE_LEADERS_V0 = LIVE_LEADERS.withFields(
             END_POINT_ID,
             HOST,
@@ -122,8 +144,17 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
             TOPIC_STATES_V2,
             LIVE_LEADERS_V0);
 
+    // LEADER_AND_ISR_REQUEST_V3 added two new fields - adding_replicas and removing_replicas.
+    // These fields respectively specify the replica IDs we want to add or remove as part of a reassignment
+    private static final Schema LEADER_AND_ISR_REQUEST_V3 = new Schema(
+            CONTROLLER_ID,
+            CONTROLLER_EPOCH,
+            BROKER_EPOCH,
+            TOPIC_STATES_V3,
+            LIVE_LEADERS_V0);
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1, LEADER_AND_ISR_REQUEST_V2};
+        return new Schema[]{LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1, LEADER_AND_ISR_REQUEST_V2, LEADER_AND_ISR_REQUEST_V3};
     }
 
     public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {
@@ -223,7 +254,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
                 for (Map.Entry<Integer, PartitionState> partitionEntry : partitionMap.entrySet()) {
                     Struct partitionStateData = topicStateData.instance(PARTITION_STATES);
                     partitionStateData.set(PARTITION_ID, partitionEntry.getKey());
-                    partitionEntry.getValue().setStruct(partitionStateData);
+                    partitionEntry.getValue().setStruct(partitionStateData, version);
                     partitionStatesData.add(partitionStateData);
                 }
                 topicStateData.set(PARTITION_STATES, partitionStatesData.toArray());
@@ -237,7 +268,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
                 TopicPartition topicPartition = entry.getKey();
                 partitionStateData.set(TOPIC_NAME, topicPartition.topic());
                 partitionStateData.set(PARTITION_ID, topicPartition.partition());
-                entry.getValue().setStruct(partitionStateData);
+                entry.getValue().setStruct(partitionStateData, version);
                 partitionStatesData.add(partitionStateData);
             }
             struct.set(PARTITION_STATES, partitionStatesData.toArray());
@@ -269,6 +300,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
             case 0:
             case 1:
             case 2:
+            case 3:
                 return new LeaderAndIsrResponse(error, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@@ -298,6 +330,8 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
 
     public static final class PartitionState {
         public final BasePartitionState basePartitionState;
+        public final List<Integer> addingReplicas;
+        public final List<Integer> removingReplicas;
         public final boolean isNew;
 
         public PartitionState(int controllerEpoch,
@@ -307,7 +341,29 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
                               int zkVersion,
                               List<Integer> replicas,
                               boolean isNew) {
+            this(controllerEpoch,
+                 leader,
+                 leaderEpoch,
+                 isr,
+                 zkVersion,
+                 replicas,
+                 Collections.emptyList(),
+                 Collections.emptyList(),
+                 isNew);
+        }
+
+        public PartitionState(int controllerEpoch,
+                              int leader,
+                              int leaderEpoch,
+                              List<Integer> isr,
+                              int zkVersion,
+                              List<Integer> replicas,
+                              List<Integer> addingReplicas,
+                              List<Integer> removingReplicas,
+                              boolean isNew) {
             this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
+            this.addingReplicas = addingReplicas;
+            this.removingReplicas = removingReplicas;
             this.isNew = isNew;
         }
 
@@ -329,6 +385,21 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
                 replicas.add((Integer) r);
 
             this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
+
+            List<Integer> addingReplicas = new ArrayList<>();
+            if (struct.hasField(ADDING_REPLICAS)) {
+                for (Object r : struct.get(ADDING_REPLICAS))
+                    addingReplicas.add((Integer) r);
+            }
+            this.addingReplicas = addingReplicas;
+
+            List<Integer> removingReplicas = new ArrayList<>();
+            if (struct.hasField(REMOVING_REPLICAS)) {
+                for (Object r : struct.get(REMOVING_REPLICAS))
+                    removingReplicas.add((Integer) r);
+            }
+            this.removingReplicas = removingReplicas;
+
             this.isNew = struct.getOrElse(IS_NEW, false);
         }
 
@@ -340,18 +411,23 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
                 ", isr=" + Utils.join(basePartitionState.isr, ",") +
                 ", zkVersion=" + basePartitionState.zkVersion +
                 ", replicas=" + Utils.join(basePartitionState.replicas, ",") +
+                ", addingReplicas=" + Utils.join(addingReplicas, ",") +
+                ", removingReplicas=" + Utils.join(removingReplicas, ",") +
                 ", isNew=" + isNew + ")";
         }
 
-        private void setStruct(Struct struct) {
+        private void setStruct(Struct struct, short version) {
             struct.set(CONTROLLER_EPOCH, basePartitionState.controllerEpoch);
             struct.set(LEADER, basePartitionState.leader);
             struct.set(LEADER_EPOCH, basePartitionState.leaderEpoch);
             struct.set(ISR, basePartitionState.isr.toArray());
             struct.set(ZK_VERSION, basePartitionState.zkVersion);
             struct.set(REPLICAS, basePartitionState.replicas.toArray());
+            if (version >= 3) {
+                struct.set(ADDING_REPLICAS, addingReplicas.toArray());
+                struct.set(REMOVING_REPLICAS, removingReplicas.toArray());
+            }
             struct.setIfExists(IS_NEW, isNew);
         }
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 3ab9bf7..3b80222 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -50,8 +50,10 @@ public class LeaderAndIsrResponse extends AbstractResponse {
 
     private static final Schema LEADER_AND_ISR_RESPONSE_V2 = LEADER_AND_ISR_RESPONSE_V1;
 
+    private static final Schema LEADER_AND_ISR_RESPONSE_V3 = LEADER_AND_ISR_RESPONSE_V2;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1, LEADER_AND_ISR_RESPONSE_V2};
+        return new Schema[]{LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1, LEADER_AND_ISR_RESPONSE_V2, LEADER_AND_ISR_RESPONSE_V3};
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
new file mode 100644
index 0000000..471147b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+public class ListPartitionReassignmentsRequest extends AbstractRequest {
+
+    public static class Builder extends AbstractRequest.Builder<ListPartitionReassignmentsRequest> {
+        private final ListPartitionReassignmentsRequestData data;
+
+        public Builder(ListPartitionReassignmentsRequestData data) {
+            super(ApiKeys.LIST_PARTITION_REASSIGNMENTS);
+            this.data = data;
+        }
+
+        @Override
+        public ListPartitionReassignmentsRequest build(short version) {
+            return new ListPartitionReassignmentsRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private ListPartitionReassignmentsRequestData data;
+    private final short version;
+
+    private ListPartitionReassignmentsRequest(ListPartitionReassignmentsRequestData data, short version) {
+        super(ApiKeys.LIST_PARTITION_REASSIGNMENTS, version);
+        this.data = data;
+        this.version = version;
+    }
+
+    ListPartitionReassignmentsRequest(Struct struct, short version) {
+        super(ApiKeys.LIST_PARTITION_REASSIGNMENTS, version);
+        this.data = new ListPartitionReassignmentsRequestData(struct, version);
+        this.version = version;
+    }
+
+    public static ListPartitionReassignmentsRequest parse(ByteBuffer buffer, short version) {
+        return new ListPartitionReassignmentsRequest(
+                ApiKeys.LIST_PARTITION_REASSIGNMENTS.parseRequest(version, buffer), version
+        );
+    }
+
+    public ListPartitionReassignmentsRequestData data() {
+        return data;
+    }
+
+    /**
+     * Visible for testing.
+     */
+    @Override
+    public Struct toStruct() {
+        return data.toStruct(version);
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        ApiError apiError = ApiError.fromThrowable(e);
+
+        List<OngoingTopicReassignment> ongoingTopicReassignments = data.topics().stream().map(topic ->
+                new OngoingTopicReassignment()
+                        .setName(topic.name())
+                        .setPartitions(topic.partitionIndexes().stream().map(partitionIndex ->
+                                new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList())
+                        )
+        ).collect(Collectors.toList());
+
+        ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData()
+                .setTopics(ongoingTopicReassignments)
+                .setErrorCode(apiError.error().code())
+                .setErrorMessage(apiError.message())
+                .setThrottleTimeMs(throttleTimeMs);
+        return new ListPartitionReassignmentsResponse(responseData);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
new file mode 100644
index 0000000..9513e88
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ListPartitionReassignmentsResponse extends AbstractResponse {
+
+    private final ListPartitionReassignmentsResponseData data;
+
+    public ListPartitionReassignmentsResponse(Struct struct) {
+        this(struct, ApiKeys.LIST_PARTITION_REASSIGNMENTS.latestVersion());
+    }
+
+    ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) {
+        this.data = responseData;
+    }
+
+    ListPartitionReassignmentsResponse(Struct struct, short version) {
+        this.data = new ListPartitionReassignmentsResponseData(struct, version);
+    }
+
+    public static ListPartitionReassignmentsResponse parse(ByteBuffer buffer, short version) {
+        return new ListPartitionReassignmentsResponse(ApiKeys.LIST_PARTITION_REASSIGNMENTS.responseSchema(version).read(buffer), version);
+    }
+
+    public ListPartitionReassignmentsResponseData data() {
+        return data;
+    }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return true;
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> counts = new HashMap<>();
+        Errors topLevelErr = Errors.forCode(data.errorCode());
+        counts.put(topLevelErr, 1);
+
+        return counts;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
+}
diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
new file mode 100644
index 0000000..f962e1e
--- /dev/null
+++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 45,
+  "type": "request",
+  "name": "AlterPartitionReassignmentsRequest",
+  "validVersions": "0",
+  "fields": [
+    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
+      "about": "The time in ms to wait for the request to complete." },
+    { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
+      "about": "The topics to reassign.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+        "about": "The topic name." },
+      { "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+",
+        "about": "The partitions to reassign.", "fields": [
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+          "about": "The partition index." },
+        { "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null",
+          "about": "The replicas to place the partitions on, or null to cancel a pending reassignment for this partition." }
+      ]}
+    ]}
+  ]
+}
diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
new file mode 100644
index 0000000..d049596
--- /dev/null
+++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 45,
+  "type": "response",
+  "name": "AlterPartitionReassignmentsResponse",
+  "validVersions": "0",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error." },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
+      "about": "The top-level error message, or null if there was no error." },
+    { "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+",
+      "about": "The responses to topics to reassign.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+        "about": "The topic name" },
+      { "name": "Partitions", "type": "[]ReassignablePartitionResponse", "versions": "0+",
+        "about": "The responses to partitions to reassign", "fields": [
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+          "about": "The partition index." },
+        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+          "about": "The error code for this partition, or 0 if there was no error." },
+        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
+          "about": "The error message for this partition, or null if there was no error." }
+      ]}
+    ]}
+  ]
+}
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index a449b86..c43d2f4 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -20,7 +20,9 @@
   // Version 1 adds IsNew.
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
-  "validVersions": "0-2",
+  //
+  // Version 3 adds AddingReplicas and RemovingReplicas
+  "validVersions": "0-3",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The current controller ID." },
@@ -68,6 +70,10 @@
         "about": "The ZooKeeper version." },
       { "name": "Replicas", "type": "[]int32", "versions": "0+",
         "about": "The replica IDs." },
+      { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true,
+        "about": "The replica IDs that we are adding this partition to, or null if no replicas are being added." },
+      { "name": "RemovingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true,
+        "about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." },
       { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
         "about": "Whether the replica should have existed on the broker or not." }
     ]}
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
index 8f4bf63..06bb088 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
@@ -20,7 +20,9 @@
   // Version 1 adds KAFKA_STORAGE_ERROR as a valid error code.
   //
   // Version 2 is the same as version 1.
-  "validVersions": "0-2",
+  //
+  // Version 3 is the same as version 2.
+  "validVersions": "0-3",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
new file mode 100644
index 0000000..d0ebf8b
--- /dev/null
+++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 46,
+  "type": "request",
+  "name": "ListPartitionReassignmentsRequest",
+  "validVersions": "0",
+  "fields": [
+    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
+      "about": "The time in ms to wait for the request to complete." },
+    { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+",
+      "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+        "about": "The topic name" },
+      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
+        "about": "The partitions to list partition reassignments for." }
+    ]}
+  ]
+}
diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsResponse.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsResponse.json
new file mode 100644
index 0000000..b79e052
--- /dev/null
+++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsResponse.json
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 46,
+  "type": "response",
+  "name": "ListPartitionReassignmentsResponse",
+  "validVersions": "0",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error" },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
+      "about": "The top-level error message, or null if there was no error." },
+    { "name": "Topics", "type": "[]OngoingTopicReassignment", "versions": "0+",
+      "about": "The ongoing reassignments for each topic.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+        "about": "The topic name." },
+      { "name": "Partitions", "type": "[]OngoingPartitionReassignment", "versions": "0+",
+        "about": "The ongoing reassignments for each partition.", "fields": [
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+          "about": "The index of the partition." },
+        { "name": "Replicas", "type": "[]int32", "versions": "0+",
+          "about": "The current replica set." },
+        { "name": "AddingReplicas", "type": "[]int32", "versions": "0+",
+          "about": "The set of replicas we are currently adding." },
+        { "name": "RemovingReplicas", "type": "[]int32", "versions": "0+",
+          "about": "The set of replicas we are currently removing." }
+      ]}
+    ]}
+  ]
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 58c650a..bdfce3f 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -233,12 +233,53 @@ public final class MessageTest {
 
     }
 
+    @Test
+    public void testLeaderAndIsrVersions() throws Exception {
+        // Version 3 adds two new fields - AddingReplicas and RemovingReplicas
+        LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState partitionStateNoAddingRemovingReplicas =
+                new LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState()
+                        .setName("topic")
+                        .setPartitionStatesV0(
+                                Collections.singletonList(
+                                        new LeaderAndIsrRequestData.LeaderAndIsrRequestPartition()
+                                                .setPartitionIndex(0)
+                                                .setReplicas(Collections.singletonList(0))
+                                )
+                        );
+        LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState partitionStateWithAddingRemovingReplicas =
+                new LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState()
+                        .setName("topic")
+                        .setPartitionStatesV0(
+                                Collections.singletonList(
+                                        new LeaderAndIsrRequestData.LeaderAndIsrRequestPartition()
+                                                .setPartitionIndex(0)
+                                                .setReplicas(Collections.singletonList(0))
+                                                .setAddingReplicas(Collections.singletonList(1))
+                                                .setRemovingReplicas(Collections.singletonList(1))
+                                )
+                        );
+        testAllMessageRoundTripsBetweenVersions(
+                (short) 2,
+                (short) 3,
+                new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateWithAddingRemovingReplicas)),
+                new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateNoAddingRemovingReplicas)));
+        testAllMessageRoundTripsFromVersion((short) 3, new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateWithAddingRemovingReplicas)));
+    }
+
     private void testAllMessageRoundTrips(Message message) throws Exception {
         testAllMessageRoundTripsFromVersion(message.lowestSupportedVersion(), message);
     }
 
     private void testAllMessageRoundTripsBeforeVersion(short beforeVersion, Message message, Message expected) throws Exception {
-        for (short version = 0; version < beforeVersion; version++) {
+        testAllMessageRoundTripsBetweenVersions((short) 0, beforeVersion, message, expected);
+    }
+
+    /**
+     * @param startVersion - the version we want to start at, inclusive
+     * @param endVersion - the version we want to end at, exclusive
+     */
+    private void testAllMessageRoundTripsBetweenVersions(short startVersion, short endVersion, Message message, Message expected) throws Exception {
+        for (short version = startVersion; version < endVersion; version++) {
             testMessageRoundTrip(version, message, expected);
         }
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9855210..70adcca 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -66,6 +66,10 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.Altera
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -362,6 +366,12 @@ public class RequestResponseTest {
         checkRequest(createIncrementalAlterConfigsRequest(), true);
         checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException(), true);
         checkResponse(createIncrementalAlterConfigsResponse(), 0, true);
+        checkRequest(createAlterPartitionReassignmentsRequest(), true);
+        checkErrorResponse(createAlterPartitionReassignmentsRequest(), new UnknownServerException(), true);
+        checkResponse(createAlterPartitionReassignmentsResponse(), 0, true);
+        checkRequest(createListPartitionReassignmentsRequest(), true);
+        checkErrorResponse(createListPartitionReassignmentsRequest(), new UnknownServerException(), true);
+        checkResponse(createListPartitionReassignmentsResponse(), 0, true);
     }
 
     @Test
@@ -1630,4 +1640,61 @@ public class RequestResponseTest {
                 .setErrorMessage("Duplicate Keys"));
         return new IncrementalAlterConfigsResponse(data);
     }
+
+    private AlterPartitionReassignmentsRequest createAlterPartitionReassignmentsRequest() {
+        AlterPartitionReassignmentsRequestData data = new AlterPartitionReassignmentsRequestData();
+        data.topics().add(
+                new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("topic").setPartitions(
+                        Collections.singletonList(
+                                new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null)
+                        )
+                )
+        );
+        return new AlterPartitionReassignmentsRequest.Builder(data).build((short) 0);
+    }
+
+    private AlterPartitionReassignmentsResponse createAlterPartitionReassignmentsResponse() {
+        AlterPartitionReassignmentsResponseData data = new AlterPartitionReassignmentsResponseData();
+        data.responses().add(
+                new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse()
+                        .setName("topic")
+                        .setPartitions(Collections.singletonList(
+                                new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse()
+                                        .setPartitionIndex(0)
+                                        .setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code())
+                                        .setErrorMessage("No reassignment is in progress for topic topic partition 0")
+                                )
+                        )
+        );
+        return new AlterPartitionReassignmentsResponse(data);
+    }
+
+    private ListPartitionReassignmentsRequest createListPartitionReassignmentsRequest() {
+        ListPartitionReassignmentsRequestData data = new ListPartitionReassignmentsRequestData();
+        data.setTopics(
+            Collections.singletonList(
+                new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics()
+                    .setName("topic")
+                    .setPartitionIndexes(Collections.singletonList(1))
+            )
+        );
+        return new ListPartitionReassignmentsRequest.Builder(data).build((short) 0);
+    }
+
+    private ListPartitionReassignmentsResponse createListPartitionReassignmentsResponse() {
+        ListPartitionReassignmentsResponseData data = new ListPartitionReassignmentsResponseData();
+        data.topics().add(
+                new ListPartitionReassignmentsResponseData.OngoingTopicReassignment()
+                        .setName("topic")
+                        .setPartitions(Collections.singletonList(
+                                new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment()
+                                        .setPartitionIndex(0)
+                                        .setReplicas(Arrays.asList(1, 2))
+                                        .setAddingReplicas(Collections.singletonList(2))
+                                        .setRemovingReplicas(Collections.singletonList(1))
+                                )
+                        )
+        );
+        return new ListPartitionReassignmentsResponse(data);
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2a0fbd3..3ec6b23 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -52,6 +52,8 @@ import org.apache.kafka.common.message.CreateTopicsResponseData
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
 import org.apache.kafka.common.message.DeleteGroupsResponseData
 import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData
 import org.apache.kafka.common.message.DeleteTopicsResponseData
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.message.DescribeGroupsResponseData
@@ -178,6 +180,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
         case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
+        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
+        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -2299,6 +2303,32 @@ class KafkaApis(val requestChannel: RequestChannel,
       new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
   }
 
+  def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
+    authorizeClusterAlter(request)
+    val alterPartitionReassignmentsRequest = request.body[AlterPartitionReassignmentsRequest]
+
+    sendResponseMaybeThrottle(request, requestThrottleMs =>
+      new AlterPartitionReassignmentsResponse(
+        new AlterPartitionReassignmentsResponseData().setThrottleTimeMs(requestThrottleMs)
+          .setErrorCode(Errors.UNSUPPORTED_VERSION.code()).setErrorMessage(Errors.UNSUPPORTED_VERSION.message())
+          .toStruct(0)
+      )
+    )
+  }
+
+  def handleListPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
+    authorizeClusterDescribe(request)
+    val listPartitionReassignmentsRequest = request.body[ListPartitionReassignmentsRequest]
+
+    sendResponseMaybeThrottle(request, requestThrottleMs =>
+      new ListPartitionReassignmentsResponse(
+        new ListPartitionReassignmentsResponseData().setThrottleTimeMs(requestThrottleMs)
+          .setErrorCode(Errors.UNSUPPORTED_VERSION.code()).setErrorMessage(Errors.UNSUPPORTED_VERSION.message())
+          .toStruct(0)
+      )
+    )
+  }
+
   private def configsAuthorizationApiError(session: RequestChannel.Session, resource: ConfigResource): ApiError = {
     val error = resource.`type` match {
       case ConfigResource.Type.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index cb5f1aa..ef2e6fd 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -44,6 +44,8 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData
 import org.apache.kafka.common.message.HeartbeatRequestData
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
@@ -164,7 +166,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
       ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse],
       ApiKeys.ELECT_LEADERS -> classOf[ElectLeadersResponse],
-      ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse]
+      ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse],
+      ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> classOf[AlterPartitionReassignmentsResponse],
+      ApiKeys.LIST_PARTITION_REASSIGNMENTS -> classOf[ListPartitionReassignmentsResponse]
     )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -212,7 +216,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error),
     ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())),
     ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) =>
-      IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error)
+      IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error),
+    ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode())),
+    ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode()))
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -252,7 +258,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
     ApiKeys.CREATE_PARTITIONS -> topicAlterAcl,
     ApiKeys.ELECT_LEADERS -> clusterAlterAcl,
-    ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl
+    ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl,
+    ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> clusterAlterAcl,
+    ApiKeys.LIST_PARTITION_REASSIGNMENTS -> clusterDescribeAcl
   )
 
   @Before
@@ -485,6 +493,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     10000
   ).build()
 
+  private def alterPartitionReassignmentsRequest = new AlterPartitionReassignmentsRequest.Builder(
+    new AlterPartitionReassignmentsRequestData().setTopics(
+      List(new AlterPartitionReassignmentsRequestData.ReassignableTopic()
+        .setName(topic)
+        .setPartitions(
+          List(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(tp.partition())).asJava
+        )).asJava
+    )
+  ).build()
+
+  private def listPartitionReassignmentsRequest = new ListPartitionReassignmentsRequest.Builder(
+    new ListPartitionReassignmentsRequestData().setTopics(
+      List(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics()
+        .setName(topic)
+        .setPartitionIndexes(
+          List(new Integer(tp.partition)).asJava
+        )).asJava
+    )
+  ).build()
+
   @Test
   def testAuthorizationWithTopicExisting() {
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@@ -520,7 +548,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       // Check StopReplica last since some APIs depend on replica availability
       ApiKeys.STOP_REPLICA -> stopReplicaRequest,
       ApiKeys.ELECT_LEADERS -> electLeadersRequest,
-      ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest
+      ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest,
+      ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> alterPartitionReassignmentsRequest,
+      ApiKeys.LIST_PARTITION_REASSIGNMENTS -> listPartitionReassignmentsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 242ab21..b09fc02 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -41,6 +41,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.ListGroupsRequestData
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData
 import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.apache.kafka.common.message.SaslAuthenticateRequestData
 import org.apache.kafka.common.message.SaslHandshakeRequestData
@@ -465,6 +467,16 @@ class RequestQuotaTest extends BaseRequestTest {
           new IncrementalAlterConfigsRequest.Builder(
             new IncrementalAlterConfigsRequestData())
 
+        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS =>
+          new AlterPartitionReassignmentsRequest.Builder(
+            new AlterPartitionReassignmentsRequestData()
+          )
+
+        case ApiKeys.LIST_PARTITION_REASSIGNMENTS =>
+          new ListPartitionReassignmentsRequest.Builder(
+            new ListPartitionReassignmentsRequestData()
+          )
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -568,6 +580,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.ELECT_LEADERS => new ElectLeadersResponse(response).throttleTimeMs
       case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
         new IncrementalAlterConfigsResponse(response, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion()).throttleTimeMs
+      case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => new AlterPartitionReassignmentsResponse(response).throttleTimeMs
+      case ApiKeys.LIST_PARTITION_REASSIGNMENTS => new ListPartitionReassignmentsResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }


Mime
View raw message