kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.8 updated: MINOR: Rename DecommissionBrokers to UnregisterBrokers (#10084)
Date Wed, 10 Feb 2021 20:51:22 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 88265b3  MINOR: Rename DecommissionBrokers to UnregisterBrokers (#10084)
88265b3 is described below

commit 88265b37cb68ae8f262542195927577d9ce7e814
Author: Colin Patrick McCabe <cmccabe@confluent.io>
AuthorDate: Wed Feb 10 12:44:47 2021 -0800

    MINOR: Rename DecommissionBrokers to UnregisterBrokers (#10084)
    
    Rename DecommissionBrokers to UnregisterBrokers. Fix an incorrect JavaDoc comment
    for the Admin API. Make sure that UNREGISTER_BROKER is marked as forwardable and
    not as a controller-only API (since it can received by brokers).
    
    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../java/org/apache/kafka/clients/admin/Admin.java | 31 +++++----
 .../kafka/clients/admin/KafkaAdminClient.java      | 27 ++++----
 ...erOptions.java => UnregisterBrokerOptions.java} |  4 +-
 ...okerResult.java => UnregisterBrokerResult.java} |  6 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  5 +-
 .../org/apache/kafka/common/protocol/Errors.java   |  2 +-
 .../kafka/common/requests/AbstractRequest.java     |  4 +-
 .../kafka/common/requests/AbstractResponse.java    |  4 +-
 ...erRequest.java => UnregisterBrokerRequest.java} | 34 +++++-----
 ...Response.java => UnregisterBrokerResponse.java} | 16 ++---
 ...erRequest.json => UnregisterBrokerRequest.json} |  4 +-
 ...Response.json => UnregisterBrokerResponse.json} |  2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 74 ++++++++++------------
 .../kafka/clients/admin/MockAdminClient.java       |  6 +-
 .../kafka/common/requests/RequestResponseTest.java | 24 +++----
 .../scala/kafka/network/RequestConvertToJson.scala |  4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 10 ++-
 core/src/main/scala/kafka/tools/ClusterTool.scala  | 23 ++++---
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  4 +-
 .../scala/unit/kafka/tools/ClusterToolTest.scala   | 12 ++--
 20 files changed, 151 insertions(+), 145 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index c39462a..d732cd2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1486,41 +1486,46 @@ public interface Admin extends AutoCloseable {
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
 
     /**
-     * Permanently remove a broker and reassign any partitions on the broker.
+     * Unregister a broker.
      * <p>
-     * This operation is supported only on self-managed Kafka clusters (i.e. brokers which do not rely on Zookeeper).
+     * This operation does not have any effect on partition assignments. It is supported
+     * only on Kafka clusters which use Raft to store metadata, rather than ZooKeeper.
      *
-     * @param brokerId  the broker id to unregister.
+     * This is a convenience method for {@link #unregisterBroker(int, UnregisterBrokerOptions)}
      *
-     * <p>This is a convenience method for {@link #decommissionBroker(int, DecommissionBrokerOptions)}
+     * @param brokerId  the broker id to unregister.
      *
-     * @return the {@link DecommissionBrokerResult} containing the result
+     * @return the {@link UnregisterBrokerResult} containing the result
      */
-    default DecommissionBrokerResult decommissionBroker(int brokerId) {
-        return decommissionBroker(brokerId, new DecommissionBrokerOptions());
+    @InterfaceStability.Unstable
+    default UnregisterBrokerResult unregisterBroker(int brokerId) {
+        return unregisterBroker(brokerId, new UnregisterBrokerOptions());
     }
 
     /**
-     * Permanently remove a broker and reassign any partitions on the broker.
+     * Unregister a broker.
      * <p>
-     * This operation is supported only on self-managed Kafka clusters (i.e. brokers which do not rely on Zookeeper).
+     * This operation does not have any effect on partition assignments. It is supported
+     * only on Kafka clusters which use Raft to store metadata, rather than ZooKeeper.
      *
      * The following exceptions can be anticipated when calling {@code get()} on the future from the
-     * returned {@link DescribeFeaturesResult}:
+     * returned {@link UnregisterBrokerResult}:
      * <ul>
      *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
      *   If the request timed out before the describe operation could finish.</li>
      *   <li>{@link org.apache.kafka.common.errors.UnsupportedVersionException}
-     *   If the software is too old to support decommissioning.
+     *   If the software is too old to support the unregistration API, or if the
+     *   cluster is not using Raft to store metadata.
      * </ul>
      * <p>
      *
      * @param brokerId  the broker id to unregister.
      * @param options   the options to use.
      *
-     * @return the {@link DecommissionBrokerResult} containing the result
+     * @return the {@link UnregisterBrokerResult} containing the result
      */
-    DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options);
+    @InterfaceStability.Unstable
+    UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options);
 
     /**
      * Get the metrics kept by the adminClient
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 54603a5..ab40470 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -98,7 +98,6 @@ import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
-import org.apache.kafka.common.message.DecommissionBrokerRequestData;
 import org.apache.kafka.common.message.DeleteAclsRequestData;
 import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
 import org.apache.kafka.common.message.DeleteAclsResponseData;
@@ -147,6 +146,7 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteReque
 import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic;
 import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection;
 import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.UnregisterBrokerRequestData;
 import org.apache.kafka.common.message.UpdateFeaturesRequestData;
 import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -185,8 +185,6 @@ import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
-import org.apache.kafka.common.requests.DecommissionBrokerRequest;
-import org.apache.kafka.common.requests.DecommissionBrokerResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
@@ -238,6 +236,8 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.requests.UnregisterBrokerRequest;
+import org.apache.kafka.common.requests.UnregisterBrokerResponse;
 import org.apache.kafka.common.requests.UpdateFeaturesRequest;
 import org.apache.kafka.common.requests.UpdateFeaturesResponse;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -4608,23 +4608,23 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
+    public UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options) {
         final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
-        final Call call = new Call("decommissionBroker", calcDeadlineMs(now, options.timeoutMs()),
+        final Call call = new Call("unregisterBroker", calcDeadlineMs(now, options.timeoutMs()),
                 new LeastLoadedNodeProvider()) {
 
             @Override
-            DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
-                DecommissionBrokerRequestData data =
-                        new DecommissionBrokerRequestData().setBrokerId(brokerId);
-                return new DecommissionBrokerRequest.Builder(data);
+            UnregisterBrokerRequest.Builder createRequest(int timeoutMs) {
+                UnregisterBrokerRequestData data =
+                        new UnregisterBrokerRequestData().setBrokerId(brokerId);
+                return new UnregisterBrokerRequest.Builder(data);
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
-                final DecommissionBrokerResponse response =
-                        (DecommissionBrokerResponse) abstractResponse;
+                final UnregisterBrokerResponse response =
+                        (UnregisterBrokerResponse) abstractResponse;
                 Errors error = Errors.forCode(response.data().errorCode());
                 switch (error) {
                     case NONE:
@@ -4633,7 +4633,8 @@ public class KafkaAdminClient extends AdminClient {
                     case REQUEST_TIMED_OUT:
                         throw error.exception();
                     default:
-                        log.error("Decommission Broker request for broker ID {} failed: {}", brokerId, error.message());
+                        log.error("Unregister broker request for broker ID {} failed: {}",
+                            brokerId, error.message());
                         future.completeExceptionally(error.exception());
                         break;
                 }
@@ -4645,7 +4646,7 @@ public class KafkaAdminClient extends AdminClient {
             }
         };
         runnable.call(call, now);
-        return new DecommissionBrokerResult(future);
+        return new UnregisterBrokerResult(future);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerOptions.java
similarity index 85%
rename from clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerOptions.java
rename to clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerOptions.java
index ed0d698..1935b79 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerOptions.java
@@ -20,10 +20,10 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link Admin#decommissionBroker(int, DecommissionBrokerOptions)}.
+ * Options for {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)}.
  *
  * The API of this class is evolving. See {@link Admin} for details.
  */
 @InterfaceStability.Evolving
-public class DecommissionBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> {
+public class UnregisterBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> {
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerResult.java
similarity index 86%
rename from clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerResult.java
rename to clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerResult.java
index b5e2f52..b44c7e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerResult.java
@@ -20,14 +20,14 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.KafkaFuture;
 
 /**
- * The result of the {@link Admin#decommissionBroker(int, DecommissionBrokerOptions)} call.
+ * The result of the {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)} call.
  *
  * The API of this class is evolving, see {@link Admin} for details.
  */
-public class DecommissionBrokerResult {
+public class UnregisterBrokerResult {
     private final KafkaFuture<Void> future;
 
-    DecommissionBrokerResult(final KafkaFuture<Void> future) {
+    UnregisterBrokerResult(final KafkaFuture<Void> future) {
         this.future = future;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 9b8ef7e..49a6130 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -102,10 +102,7 @@ public enum ApiKeys {
     DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
     BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false, true),
     BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false, true),
-
-    // Once we have the controller integration for supporting broker decommissioning, we will support forwarding from the broker
-    // This is a short-term workaround to avoid advertizing the API on Zookeeper-based brokers
-    DECOMMISSION_BROKER(ApiMessageType.DECOMMISSION_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, true);
+    UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, false);
 
     // The generator ensures every `ApiMessageType` has a unique id
     private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 00a3c65..03c1248 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -354,7 +354,7 @@ public enum Errors {
         "Requested position is not greater than or equal to zero, and less than the size of the snapshot.",
         PositionOutOfRangeException::new),
     UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new),
-    DUPLICATE_BROKER_REGISTRATION_EXCEPTION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);
+    DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 2e4cf30..64befeb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -282,8 +282,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
                 return BrokerRegistrationRequest.parse(buffer, apiVersion);
             case BROKER_HEARTBEAT:
                 return BrokerHeartbeatRequest.parse(buffer, apiVersion);
-            case DECOMMISSION_BROKER:
-                return DecommissionBrokerRequest.parse(buffer, apiVersion);
+            case UNREGISTER_BROKER:
+                return UnregisterBrokerRequest.parse(buffer, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 410d1cd..c4dd7d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -239,8 +239,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
                 return BrokerRegistrationResponse.parse(responseBuffer, version);
             case BROKER_HEARTBEAT:
                 return BrokerHeartbeatResponse.parse(responseBuffer, version);
-            case DECOMMISSION_BROKER:
-                return DecommissionBrokerResponse.parse(responseBuffer, version);
+            case UNREGISTER_BROKER:
+                return UnregisterBrokerResponse.parse(responseBuffer, version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
similarity index 55%
rename from clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerRequest.java
rename to clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
index cf4bb4d..253499f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
@@ -16,52 +16,52 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.message.DecommissionBrokerRequestData;
-import org.apache.kafka.common.message.DecommissionBrokerResponseData;
+import org.apache.kafka.common.message.UnregisterBrokerRequestData;
+import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
 
-public class DecommissionBrokerRequest extends AbstractRequest {
+public class UnregisterBrokerRequest extends AbstractRequest {
 
-    public static class Builder extends AbstractRequest.Builder<DecommissionBrokerRequest> {
-        private final DecommissionBrokerRequestData data;
+    public static class Builder extends AbstractRequest.Builder<UnregisterBrokerRequest> {
+        private final UnregisterBrokerRequestData data;
 
-        public Builder(DecommissionBrokerRequestData data) {
-            super(ApiKeys.DECOMMISSION_BROKER);
+        public Builder(UnregisterBrokerRequestData data) {
+            super(ApiKeys.UNREGISTER_BROKER);
             this.data = data;
         }
 
         @Override
-        public DecommissionBrokerRequest build(short version) {
-            return new DecommissionBrokerRequest(data, version);
+        public UnregisterBrokerRequest build(short version) {
+            return new UnregisterBrokerRequest(data, version);
         }
     }
 
-    private final DecommissionBrokerRequestData data;
+    private final UnregisterBrokerRequestData data;
 
-    public DecommissionBrokerRequest(DecommissionBrokerRequestData data, short version) {
-        super(ApiKeys.DECOMMISSION_BROKER, version);
+    public UnregisterBrokerRequest(UnregisterBrokerRequestData data, short version) {
+        super(ApiKeys.UNREGISTER_BROKER, version);
         this.data = data;
     }
 
     @Override
-    public DecommissionBrokerRequestData data() {
+    public UnregisterBrokerRequestData data() {
         return data;
     }
 
     @Override
-    public DecommissionBrokerResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+    public UnregisterBrokerResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
-        return new DecommissionBrokerResponse(new DecommissionBrokerResponseData()
+        return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
                 .setThrottleTimeMs(throttleTimeMs)
                 .setErrorCode(error.code()));
     }
 
-    public static DecommissionBrokerRequest parse(ByteBuffer buffer, short version) {
-        return new DecommissionBrokerRequest(new DecommissionBrokerRequestData(new ByteBufferAccessor(buffer), version),
+    public static UnregisterBrokerRequest parse(ByteBuffer buffer, short version) {
+        return new UnregisterBrokerRequest(new UnregisterBrokerRequestData(new ByteBufferAccessor(buffer), version),
                 version);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
similarity index 73%
rename from clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerResponse.java
rename to clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
index 3e08cbd..b508ac3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.message.DecommissionBrokerResponseData;
+import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
@@ -26,16 +26,16 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
-public class DecommissionBrokerResponse extends AbstractResponse {
-    private final DecommissionBrokerResponseData data;
+public class UnregisterBrokerResponse extends AbstractResponse {
+    private final UnregisterBrokerResponseData data;
 
-    public DecommissionBrokerResponse(DecommissionBrokerResponseData data) {
-        super(ApiKeys.DECOMMISSION_BROKER);
+    public UnregisterBrokerResponse(UnregisterBrokerResponseData data) {
+        super(ApiKeys.UNREGISTER_BROKER);
         this.data = data;
     }
 
     @Override
-    public DecommissionBrokerResponseData data() {
+    public UnregisterBrokerResponseData data() {
         return data;
     }
 
@@ -53,8 +53,8 @@ public class DecommissionBrokerResponse extends AbstractResponse {
         return errorCounts;
     }
 
-    public static DecommissionBrokerResponse parse(ByteBuffer buffer, short version) {
-        return new DecommissionBrokerResponse(new DecommissionBrokerResponseData(new ByteBufferAccessor(buffer), version));
+    public static UnregisterBrokerResponse parse(ByteBuffer buffer, short version) {
+        return new UnregisterBrokerResponse(new UnregisterBrokerResponseData(new ByteBufferAccessor(buffer), version));
     }
 
     @Override
diff --git a/clients/src/main/resources/common/message/DecommissionBrokerRequest.json b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
similarity index 91%
rename from clients/src/main/resources/common/message/DecommissionBrokerRequest.json
rename to clients/src/main/resources/common/message/UnregisterBrokerRequest.json
index fcf9848..3c43b16 100644
--- a/clients/src/main/resources/common/message/DecommissionBrokerRequest.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
@@ -16,11 +16,11 @@
 {
   "apiKey": 64,
   "type": "request",
-  "name": "DecommissionBrokerRequest",
+  "name": "UnregisterBrokerRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+",
-      "about": "The broker ID to decommission" }
+      "about": "The broker ID to unregister." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/DecommissionBrokerResponse.json b/clients/src/main/resources/common/message/UnregisterBrokerResponse.json
similarity index 97%
rename from clients/src/main/resources/common/message/DecommissionBrokerResponse.json
rename to clients/src/main/resources/common/message/UnregisterBrokerResponse.json
index 82afa2e..3a11c1a 100644
--- a/clients/src/main/resources/common/message/DecommissionBrokerResponse.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerResponse.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 64,
   "type": "response",
-  "name": "DecommissionBrokerResponse",
+  "name": "UnregisterBrokerResponse",
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7db77b1..a5296da 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -82,7 +82,6 @@ import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartit
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResultCollection;
-import org.apache.kafka.common.message.DecommissionBrokerResponseData;
 import org.apache.kafka.common.message.DeleteAclsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
@@ -119,6 +118,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
+import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
@@ -139,7 +139,6 @@ import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
-import org.apache.kafka.common.requests.DecommissionBrokerResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
@@ -168,6 +167,7 @@ import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.requests.UnregisterBrokerResponse;
 import org.apache.kafka.common.requests.UpdateFeaturesRequest;
 import org.apache.kafka.common.requests.UpdateFeaturesResponse;
 import org.apache.kafka.common.resource.PatternType;
@@ -5172,15 +5172,13 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testDecommissionBrokerSuccess() throws InterruptedException, ExecutionException {
-        int decommissionedBrokerNode = 1;
+    public void testUnregisterBrokerSuccess() throws InterruptedException, ExecutionException {
+        int nodeId = 1;
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(
-                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
-            env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.NONE, 0));
-
-            DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
-
+                    NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+            env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.NONE, 0));
+            UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
             // Validate response
             assertNotNull(result.all());
             result.all().get();
@@ -5188,15 +5186,13 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testDecommissionBrokerFailure() {
-        int decommissionedBrokerNode = 1;
+    public void testUnregisterBrokerFailure() {
+        int nodeId = 1;
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(
-                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
-            env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.UNKNOWN_SERVER_ERROR, 0));
-
-            DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
-
+                    NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+            env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.UNKNOWN_SERVER_ERROR, 0));
+            UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
             // Validate response
             assertNotNull(result.all());
             TestUtils.assertFutureThrows(result.all(), Errors.UNKNOWN_SERVER_ERROR.exception().getClass());
@@ -5204,15 +5200,15 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testDecommissionBrokerTimeoutAndSuccessRetry() throws ExecutionException, InterruptedException {
-        int decommissionedBrokerNode = 1;
+    public void testUnregisterBrokerTimeoutAndSuccessRetry() throws ExecutionException, InterruptedException {
+        int nodeId = 1;
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(
-                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
-            env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
-            env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.NONE, 0));
+                    NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+            env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
+            env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.NONE, 0));
 
-            DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
+            UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
 
             // Validate response
             assertNotNull(result.all());
@@ -5221,15 +5217,15 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testDecommissionBrokerTimeoutAndFailureRetry() {
-        int decommissionedBrokerNode = 1;
+    public void testUnregisterBrokerTimeoutAndFailureRetry() {
+        int nodeId = 1;
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(
-                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
-            env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
-            env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.UNKNOWN_SERVER_ERROR, 0));
+                    NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+            env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
+            env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.UNKNOWN_SERVER_ERROR, 0));
 
-            DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
+            UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
 
             // Validate response
             assertNotNull(result.all());
@@ -5239,14 +5235,14 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testDecommissionBrokerTimeoutMaxRetry() {
-        int decommissionedBrokerNode = 1;
+        int nodeId = 1;
         try (final AdminClientUnitTestEnv env = mockClientEnv(Time.SYSTEM, AdminClientConfig.RETRIES_CONFIG, "1")) {
             env.kafkaClient().setNodeApiVersions(
-                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
-            env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
-            env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
+                    NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+            env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
+            env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
 
-            DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
+            UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
 
             // Validate response
             assertNotNull(result.all());
@@ -5256,14 +5252,14 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testDecommissionBrokerTimeoutMaxWait() {
-        int decommissionedBrokerNode = 1;
+        int nodeId = 1;
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(
-                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
+                    NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
 
-            DecommissionBrokerOptions options = new DecommissionBrokerOptions();
+            UnregisterBrokerOptions options = new UnregisterBrokerOptions();
             options.timeoutMs = 10;
-            DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode, options);
+            UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId, options);
 
             // Validate response
             assertNotNull(result.all());
@@ -5271,8 +5267,8 @@ public class KafkaAdminClientTest {
         }
     }
 
-    private DecommissionBrokerResponse prepareDecommissionBrokerResponse(Errors error, int throttleTimeMs) {
-        return new DecommissionBrokerResponse(new DecommissionBrokerResponseData()
+    private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) {
+        return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
                 .setErrorCode(error.code())
                 .setErrorMessage(error.message())
                 .setThrottleTimeMs(throttleTimeMs));
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c647e9f..c2b9cff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -899,13 +899,13 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
-    public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
+    public UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options) {
         if (usingRaftController) {
-            return new DecommissionBrokerResult(KafkaFuture.completedFuture(null));
+            return new UnregisterBrokerResult(KafkaFuture.completedFuture(null));
         } else {
             KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
             future.completeExceptionally(new UnsupportedVersionException(""));
-            return new DecommissionBrokerResult(future);
+            return new UnregisterBrokerResult(future);
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 26399c4..f87d9f9 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -74,8 +74,6 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicCo
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
-import org.apache.kafka.common.message.DecommissionBrokerRequestData;
-import org.apache.kafka.common.message.DecommissionBrokerResponseData;
 import org.apache.kafka.common.message.DeleteAclsRequestData;
 import org.apache.kafka.common.message.DeleteAclsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsRequestData;
@@ -166,6 +164,8 @@ import org.apache.kafka.common.message.StopReplicaResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.UnregisterBrokerRequestData;
+import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
@@ -558,11 +558,11 @@ public class RequestResponseTest {
     }
 
     @Test
-    public void testDecommissionBrokerSerialization() {
-        for (short v = ApiKeys.DECOMMISSION_BROKER.oldestVersion(); v <= ApiKeys.DECOMMISSION_BROKER.latestVersion(); v++) {
-            checkRequest(createDecommissionBrokerRequest(v), true);
-            checkErrorResponse(createDecommissionBrokerRequest(v), unknownServerException, true);
-            checkResponse(createDecommissionBrokerResponse(), v, true);
+    public void testUnregisterBrokerSerialization() {
+        for (short v = ApiKeys.UNREGISTER_BROKER.oldestVersion(); v <= ApiKeys.UNREGISTER_BROKER.latestVersion(); v++) {
+            checkRequest(createUnregisterBrokerRequest(v), true);
+            checkErrorResponse(createUnregisterBrokerRequest(v), unknownServerException, true);
+            checkResponse(createUnregisterBrokerResponse(), v, true);
         }
     }
 
@@ -2680,13 +2680,13 @@ public class RequestResponseTest {
         return new BrokerRegistrationResponse(data);
     }
 
-    private DecommissionBrokerRequest createDecommissionBrokerRequest(short version) {
-        DecommissionBrokerRequestData data = new DecommissionBrokerRequestData().setBrokerId(1);
-        return new DecommissionBrokerRequest.Builder(data).build(version);
+    private UnregisterBrokerRequest createUnregisterBrokerRequest(short version) {
+        UnregisterBrokerRequestData data = new UnregisterBrokerRequestData().setBrokerId(1);
+        return new UnregisterBrokerRequest.Builder(data).build(version);
     }
 
-    private DecommissionBrokerResponse createDecommissionBrokerResponse() {
-        return new DecommissionBrokerResponse(new DecommissionBrokerResponseData());
+    private UnregisterBrokerResponse createUnregisterBrokerResponse() {
+        return new UnregisterBrokerResponse(new UnregisterBrokerResponseData());
     }
 
     /**
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
index 814d7ee..66ece99 100644
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
@@ -44,7 +44,6 @@ object RequestConvertToJson {
       case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
       case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
       case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DecommissionBrokerRequest => DecommissionBrokerRequestDataJsonConverter.write(req.data, request.version)
       case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
       case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
       case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
@@ -85,6 +84,7 @@ object RequestConvertToJson {
       case req: StopReplicaRequest => StopReplicaRequestDataJsonConverter.write(req.data, request.version)
       case req: SyncGroupRequest => SyncGroupRequestDataJsonConverter.write(req.data, request.version)
       case req: TxnOffsetCommitRequest => TxnOffsetCommitRequestDataJsonConverter.write(req.data, request.version)
+      case req: UnregisterBrokerRequest => UnregisterBrokerRequestDataJsonConverter.write(req.data, request.version)
       case req: UpdateFeaturesRequest => UpdateFeaturesRequestDataJsonConverter.write(req.data, request.version)
       case req: UpdateMetadataRequest => UpdateMetadataRequestDataJsonConverter.write(req.data, request.version)
       case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data, request.version)
@@ -116,7 +116,6 @@ object RequestConvertToJson {
       case res: CreateDelegationTokenResponse => CreateDelegationTokenResponseDataJsonConverter.write(res.data, version)
       case res: CreatePartitionsResponse => CreatePartitionsResponseDataJsonConverter.write(res.data, version)
       case res: CreateTopicsResponse => CreateTopicsResponseDataJsonConverter.write(res.data, version)
-      case res: DecommissionBrokerResponse => DecommissionBrokerResponseDataJsonConverter.write(res.data, version)
       case res: DeleteAclsResponse => DeleteAclsResponseDataJsonConverter.write(res.data, version)
       case res: DeleteGroupsResponse => DeleteGroupsResponseDataJsonConverter.write(res.data, version)
       case res: DeleteRecordsResponse => DeleteRecordsResponseDataJsonConverter.write(res.data, version)
@@ -157,6 +156,7 @@ object RequestConvertToJson {
       case res: StopReplicaResponse => StopReplicaResponseDataJsonConverter.write(res.data, version)
       case res: SyncGroupResponse => SyncGroupResponseDataJsonConverter.write(res.data, version)
       case res: TxnOffsetCommitResponse => TxnOffsetCommitResponseDataJsonConverter.write(res.data, version)
+      case res: UnregisterBrokerResponse => UnregisterBrokerResponseDataJsonConverter.write(res.data, version)
       case res: UpdateFeaturesResponse => UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
       case res: UpdateMetadataResponse => UpdateMetadataResponseDataJsonConverter.write(res.data, version)
       case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 938c401..c3195be 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -217,6 +217,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.ENVELOPE => handleEnvelope(request)
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
+        case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
 
         // Handle requests that should have been sent to the KIP-500 controller.
         // Until we are ready to integrate the Raft layer, these APIs are treated as
@@ -228,7 +229,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.FETCH_SNAPSHOT => requestHelper.closeConnection(request, util.Collections.emptyMap())
         case ApiKeys.BROKER_REGISTRATION => requestHelper.closeConnection(request, util.Collections.emptyMap())
         case ApiKeys.BROKER_HEARTBEAT => requestHelper.closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.DECOMMISSION_BROKER => requestHelper.closeConnection(request, util.Collections.emptyMap())
       }
     } catch {
       case e: FatalExitError => throw e
@@ -3390,6 +3390,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       new DescribeProducersResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
 
+  def handleUnregisterBrokerRequest(request: RequestChannel.Request): Unit = {
+    // This function will not be called when in self-managed quorum mode, since the
+    // UNREGISTER_BROKER API is marked as forwardable and we will always have a forwarding
+    // manager.
+    throw new UnsupportedVersionException("The broker unregistration API is not available when using " +
+      "Apache ZooKeeper mode.")
+  }
+
   private def updateRecordConversionStats(request: RequestChannel.Request,
                                           tp: TopicPartition,
                                           conversionStats: RecordConversionStats): Unit = {
diff --git a/core/src/main/scala/kafka/tools/ClusterTool.scala b/core/src/main/scala/kafka/tools/ClusterTool.scala
index f0d3d90..a5e2a82 100644
--- a/core/src/main/scala/kafka/tools/ClusterTool.scala
+++ b/core/src/main/scala/kafka/tools/ClusterTool.scala
@@ -39,9 +39,9 @@ object ClusterTool extends Logging {
 
       val clusterIdParser = subparsers.addParser("cluster-id").
         help("Get information about the ID of a cluster.")
-      val decommissionParser = subparsers.addParser("decommission").
-        help("Decommission a broker..")
-      List(clusterIdParser, decommissionParser).foreach(parser => {
+      val unregisterParser = subparsers.addParser("unregister").
+        help("Unregister a broker..")
+      List(clusterIdParser, unregisterParser).foreach(parser => {
         parser.addArgument("--bootstrap-server", "-b").
           action(store()).
           help("A list of host/port pairs to use for establishing the connection to the kafka cluster.")
@@ -49,10 +49,10 @@ object ClusterTool extends Logging {
           action(store()).
           help("A property file containing configs to passed to AdminClient.")
       })
-      decommissionParser.addArgument("--id", "-i").
+      unregisterParser.addArgument("--id", "-i").
         `type`(classOf[Integer]).
         action(store()).
-        help("The ID of the broker to decommission.")
+        help("The ID of the broker to unregister.")
 
       val namespace = parser.parseArgsOrFail(args)
       val command = namespace.getString("command")
@@ -77,10 +77,10 @@ object ClusterTool extends Logging {
             adminClient.close()
           }
           Exit.exit(0)
-        case "decommission" =>
+        case "unregister" =>
           val adminClient = Admin.create(properties)
           try {
-            decommissionCommand(System.out, adminClient, namespace.getInt("id"))
+            unregisterCommand(System.out, adminClient, namespace.getInt("id"))
           } finally {
             adminClient.close()
           }
@@ -104,18 +104,17 @@ object ClusterTool extends Logging {
     }
   }
 
-  def decommissionCommand(stream: PrintStream,
+  def unregisterCommand(stream: PrintStream,
                           adminClient: Admin,
                           id: Int): Unit = {
     try {
-      Option(adminClient.decommissionBroker(id).all().get())
-      stream.println(s"Broker ${id} is no longer registered. Note that if the broker " +
-        "is still running, or is restarted, it will re-register.")
+      Option(adminClient.unregisterBroker(id).all().get())
+      stream.println(s"Broker ${id} is no longer registered.")
     } catch {
       case e: ExecutionException => {
         val cause = e.getCause()
         if (cause.isInstanceOf[UnsupportedVersionException]) {
-          stream.println(s"The target cluster does not support broker decommissioning.")
+          stream.println(s"The target cluster does not support the broker unregistration API.")
         } else {
           throw e
         }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d326dba..5b1363f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -632,8 +632,8 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.BROKER_HEARTBEAT =>
           new BrokerHeartbeatRequest.Builder(new BrokerHeartbeatRequestData())
 
-        case ApiKeys.DECOMMISSION_BROKER =>
-          new DecommissionBrokerRequest.Builder(new DecommissionBrokerRequestData())
+        case ApiKeys.UNREGISTER_BROKER =>
+          new UnregisterBrokerRequest.Builder(new UnregisterBrokerRequestData())
 
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
diff --git a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
index 0ce100c..b98cd8e 100644
--- a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
@@ -49,26 +49,26 @@ class ClusterToolTest {
   }
 
   @Test
-  def testDecommissionBroker(): Unit = {
+  def testUnregisterBroker(): Unit = {
     val adminClient = new MockAdminClient.Builder().numBrokers(3).
       usingRaftController(true).
       build()
     val stream = new ByteArrayOutputStream()
-    ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
+    ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0)
     assertEquals(
-      s"""Broker 0 is no longer registered. Note that if the broker is still running, or is restarted, it will re-register.
+      s"""Broker 0 is no longer registered.
 """, stream.toString())
   }
 
   @Test
-  def testLegacyModeClusterCannotDecommissionBroker(): Unit = {
+  def testLegacyModeClusterCannotUnregisterBroker(): Unit = {
     val adminClient = new MockAdminClient.Builder().numBrokers(3).
       usingRaftController(false).
       build()
     val stream = new ByteArrayOutputStream()
-    ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
+    ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0)
     assertEquals(
-      s"""The target cluster does not support broker decommissioning.
+      s"""The target cluster does not support the broker unregistration API.
 """, stream.toString())
   }
 }


Mime
View raw message