kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8454; Add Java AdminClient Interface (KIP-476) (#7087)
Date Mon, 22 Jul 2019 22:48:05 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2a133ba  KAFKA-8454; Add Java AdminClient Interface (KIP-476) (#7087)
2a133ba is described below

commit 2a133ba656ae550cc9ad4bcda0a9afb27b14f4e0
Author: Andy Coates <8012398+big-andy-coates@users.noreply.github.com>
AuthorDate: Mon Jul 22 23:47:34 2019 +0100

    KAFKA-8454; Add Java AdminClient Interface (KIP-476) (#7087)
    
    Adds an `Admin` interface as specified in [KIP-476](https://cwiki.apache.org/confluence/display/KAFKA/KIP-476%3A+Add+Java+AdminClient+Interface).
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 checkstyle/suppressions.xml                        |   4 +-
 .../clients/admin/{AdminClient.java => Admin.java} | 687 ++++++++--------
 .../apache/kafka/clients/admin/AdminClient.java    | 908 +--------------------
 .../apache/kafka/clients/admin/AlterConfigOp.java  |   2 +-
 .../kafka/clients/admin/AlterConfigsOptions.java   |   4 +-
 .../kafka/clients/admin/AlterConfigsResult.java    |   4 +-
 .../clients/admin/AlterReplicaLogDirsOptions.java  |   2 +-
 .../clients/admin/AlterReplicaLogDirsResult.java   |   2 +-
 .../org/apache/kafka/clients/admin/Config.java     |   2 +-
 .../apache/kafka/clients/admin/ConfigEntry.java    |   2 +-
 .../kafka/clients/admin/CreateAclsOptions.java     |   4 +-
 .../kafka/clients/admin/CreateAclsResult.java      |   4 +-
 .../admin/CreateDelegationTokenOptions.java        |   4 +-
 .../clients/admin/CreateDelegationTokenResult.java |   2 +-
 .../clients/admin/CreatePartitionsOptions.java     |   4 +-
 .../clients/admin/CreatePartitionsResult.java      |   4 +-
 .../kafka/clients/admin/CreateTopicsOptions.java   |   4 +-
 .../kafka/clients/admin/CreateTopicsResult.java    |   4 +-
 .../kafka/clients/admin/DeleteAclsOptions.java     |   4 +-
 .../kafka/clients/admin/DeleteAclsResult.java      |   4 +-
 .../clients/admin/DeleteConsumerGroupsOptions.java |   4 +-
 .../clients/admin/DeleteConsumerGroupsResult.java  |   4 +-
 .../kafka/clients/admin/DeleteRecordsOptions.java  |   4 +-
 .../kafka/clients/admin/DeleteRecordsResult.java   |   4 +-
 .../kafka/clients/admin/DeleteTopicsOptions.java   |   4 +-
 .../kafka/clients/admin/DeleteTopicsResult.java    |   4 +-
 .../kafka/clients/admin/DescribeAclsOptions.java   |   4 +-
 .../kafka/clients/admin/DescribeAclsResult.java    |   2 +-
 .../clients/admin/DescribeClusterOptions.java      |   4 +-
 .../kafka/clients/admin/DescribeClusterResult.java |   2 +-
 .../clients/admin/DescribeConfigsOptions.java      |   4 +-
 .../kafka/clients/admin/DescribeConfigsResult.java |   2 +-
 .../admin/DescribeConsumerGroupsOptions.java       |   4 +-
 .../admin/DescribeConsumerGroupsResult.java        |   2 +-
 .../admin/DescribeDelegationTokenOptions.java      |   4 +-
 .../admin/DescribeDelegationTokenResult.java       |   2 +-
 .../clients/admin/DescribeLogDirsOptions.java      |   4 +-
 .../kafka/clients/admin/DescribeLogDirsResult.java |   4 +-
 .../admin/DescribeReplicaLogDirsOptions.java       |   4 +-
 .../admin/DescribeReplicaLogDirsResult.java        |   4 +-
 .../kafka/clients/admin/DescribeTopicsOptions.java |   4 +-
 .../kafka/clients/admin/DescribeTopicsResult.java  |   2 +-
 .../kafka/clients/admin/ElectLeadersOptions.java   |   4 +-
 .../kafka/clients/admin/ElectLeadersResult.java    |   4 +-
 .../admin/ElectPreferredLeadersOptions.java        |   6 +-
 .../clients/admin/ElectPreferredLeadersResult.java |   8 +-
 .../admin/ExpireDelegationTokenOptions.java        |   4 +-
 .../clients/admin/ExpireDelegationTokenResult.java |   2 +-
 .../kafka/clients/admin/KafkaAdminClient.java      |   4 +-
 .../admin/ListConsumerGroupOffsetsOptions.java     |   4 +-
 .../admin/ListConsumerGroupOffsetsResult.java      |   4 +-
 .../clients/admin/ListConsumerGroupsOptions.java   |   4 +-
 .../clients/admin/ListConsumerGroupsResult.java    |   4 +-
 .../kafka/clients/admin/ListTopicsOptions.java     |   4 +-
 .../kafka/clients/admin/ListTopicsResult.java      |   4 +-
 .../apache/kafka/clients/admin/NewPartitions.java  |   4 +-
 .../org/apache/kafka/clients/admin/NewTopic.java   |   2 +-
 .../kafka/clients/admin/RecordsToDelete.java       |   4 +-
 .../clients/admin/RenewDelegationTokenOptions.java |   4 +-
 .../clients/admin/RenewDelegationTokenResult.java  |   2 +-
 .../java/org/apache/kafka/common/ElectionType.java |   4 +-
 .../org/apache/kafka/clients/ClientUtilsTest.java  |  11 +-
 .../clients/admin/AdminClientUnitTestEnv.java      |   4 +-
 .../ClientAuthenticationFailureTest.java           |   4 +-
 .../runtime/errors/DeadLetterQueueReporter.java    |   4 +-
 .../apache/kafka/connect/util/ConnectUtils.java    |   6 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  |  12 +-
 .../util/clusters/EmbeddedKafkaCluster.java        |   8 +-
 core/src/main/scala/kafka/admin/AclCommand.scala   |   8 +-
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   8 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |   2 +-
 .../scala/kafka/admin/DelegationTokenCommand.scala |  12 +-
 .../scala/kafka/admin/DeleteRecordsCommand.scala   |   2 +-
 .../scala/kafka/admin/LeaderElectionCommand.scala  |   5 +-
 .../main/scala/kafka/admin/LogDirsCommand.scala    |   4 +-
 .../kafka/admin/ReassignPartitionsCommand.scala    |  18 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala |   6 +-
 .../kafka/tools/ReplicaVerificationTool.scala      |   8 +-
 .../main/scala/kafka/tools/StreamsResetter.java    |   8 +-
 .../kafka/api/AdminClientIntegrationTest.scala     |   8 +-
 .../AdminClientWithPoliciesIntegrationTest.scala   |   4 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   6 +-
 .../kafka/api/CustomQuotaCallbackTest.scala        |   6 +-
 .../DelegationTokenEndToEndAuthorizationTest.scala |   2 +-
 .../api/DescribeAuthorizedOperationsTest.scala     |   2 +-
 .../kafka/api/IntegrationTestHarness.scala         |   6 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   2 +-
 .../api/SaslSslAdminClientIntegrationTest.scala    |   2 +-
 .../kafka/network/DynamicConnectionQuotaTest.scala |   4 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |  12 +-
 .../kafka/admin/DelegationTokenCommandTest.scala   |   6 +-
 .../kafka/admin/LeaderElectionCommandTest.scala    |  13 +-
 .../admin/ReassignPartitionsClusterTest.scala      |   9 +-
 .../admin/TopicCommandWithAdminClientTest.scala    |   6 +-
 .../integration/UncleanLeaderElectionTest.scala    |   4 +-
 .../DelegationTokenRequestsOnPlainTextTest.scala   |   4 +-
 .../kafka/server/DelegationTokenRequestsTest.scala |   4 +-
 ...nTokenRequestsWithDisableTokenFeatureTest.scala |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  16 +-
 docs/api.html                                      |  10 +-
 docs/configuration.html                            |   2 +-
 docs/security.html                                 |   8 +-
 docs/toc.html                                      |   2 +-
 .../apache/kafka/streams/KafkaClientSupplier.java  |   8 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |   4 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |   8 +-
 .../internals/DefaultKafkaClientSupplier.java      |   6 +-
 .../processor/internals/InternalTopicManager.java  |   6 +-
 .../streams/processor/internals/StreamThread.java  |   4 +-
 .../streams/processor/internals/TaskManager.java   |   8 +-
 .../integration/AbstractResetIntegrationTest.java  |   6 +-
 .../integration/InternalTopicIntegrationTest.java  |   8 +-
 .../PurgeRepartitionTopicIntegrationTest.java      |   6 +-
 .../streams/integration/utils/KafkaEmbedded.java   |  10 +-
 .../processor/internals/TaskManagerTest.java       |   4 +-
 .../apache/kafka/streams/tests/EosTestDriver.java  |  11 +-
 .../org/apache/kafka/test/MockClientSupplier.java  |   4 +-
 .../kafka/test/MockInternalTopicManager.java       |   5 +-
 .../kafka/tools/ClientCompatibilityTest.java       |   6 +-
 .../apache/kafka/trogdor/common/WorkerUtils.java   |  18 +-
 .../trogdor/workload/ConnectionStressWorker.java   |   4 +-
 121 files changed, 653 insertions(+), 1552 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 63dc730..03321f1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -10,7 +10,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/>
+              files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
     <suppress checks="ClassFanOutComplexity"
@@ -41,7 +41,7 @@
               files="Sender.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|AdminClient|KafkaAdminClient).java"/>
+              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
               files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
similarity index 50%
copy from clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index ad9409a..c1be7a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,423 +36,418 @@ import org.apache.kafka.common.config.ConfigResource;
 
 /**
  * The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
- *
+ * <p>
  * The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker
  * version required.
- *
+ * <p>
  * This client was introduced in 0.11.0.0 and the API is still evolving. We will try to evolve the API in a compatible
  * manner, but we reserve the right to make breaking changes in minor releases, if necessary. We will update the
  * {@code InterfaceStability} annotation and this notice once the API is considered stable.
  */
 @InterfaceStability.Evolving
-public abstract class AdminClient implements AutoCloseable {
+public interface Admin extends AutoCloseable {
 
     /**
-     * Create a new AdminClient with the given configuration.
+     * Create a new Admin with the given configuration.
      *
      * @param props The configuration.
      * @return The new KafkaAdminClient.
      */
-    public static AdminClient create(Properties props) {
+    static Admin create(Properties props) {
         return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), null);
     }
 
     /**
-     * Create a new AdminClient with the given configuration.
+     * Create a new Admin with the given configuration.
      *
      * @param conf The configuration.
      * @return The new KafkaAdminClient.
      */
-    public static AdminClient create(Map<String, Object> conf) {
+    static Admin create(Map<String, Object> conf) {
         return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null);
     }
 
     /**
-     * Close the AdminClient and release all associated resources.
-     *
-     * See {@link AdminClient#close(long, TimeUnit)}
+     * Close the Admin and release all associated resources.
+     * <p>
+     * See {@link Admin#close(long, TimeUnit)}
      */
     @Override
-    public void close() {
+    default void close() {
         close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
     }
 
     /**
-     * Close the AdminClient and release all associated resources.
-     *
+     * Close the Admin and release all associated resources.
+     * <p>
      * The close operation has a grace period during which current operations will be allowed to
      * complete, specified by the given duration and time unit.
      * New operations will not be accepted during the grace period.  Once the grace period is over,
      * all operations that have not yet been completed will be aborted with a TimeoutException.
      *
-     * @param duration  The duration to use for the wait time.
-     * @param unit      The time unit to use for the wait time.
+     * @param duration The duration to use for the wait time.
+     * @param unit     The time unit to use for the wait time.
      * @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
      */
     @Deprecated
-    public void close(long duration, TimeUnit unit) {
+    default void close(long duration, TimeUnit unit) {
         close(Duration.ofMillis(unit.toMillis(duration)));
     }
 
     /**
-     * Close the AdminClient and release all associated resources.
-     *
+     * Close the Admin client and release all associated resources.
+     * <p>
      * The close operation has a grace period during which current operations will be allowed to
      * complete, specified by the given duration.
      * New operations will not be accepted during the grace period.  Once the grace period is over,
      * all operations that have not yet been completed will be aborted with a TimeoutException.
      *
-     * @param timeout  The time to use for the wait time.
+     * @param timeout The time to use for the wait time.
      */
-    public abstract void close(Duration timeout);
+    void close(Duration timeout);
 
     /**
      * Create a batch of new topics with the default options.
-     *
+     * <p>
      * This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options.
      * See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.10.1.0 or higher.
      *
-     * @param newTopics         The new topics to create.
-     * @return                  The CreateTopicsResult.
+     * @param newTopics The new topics to create.
+     * @return The CreateTopicsResult.
      */
-    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
+    default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
         return createTopics(newTopics, new CreateTopicsOptions());
     }
 
     /**
      * Create a batch of new topics.
-     *
+     * <p>
      * This operation is not transactional so it may succeed for some topics while fail for others.
-     *
+     * <p>
      * It may take several seconds after {@code CreateTopicsResult} returns
      * success for all the brokers to become aware that the topics have been created.
-     * During this time, {@link AdminClient#listTopics()} and {@link AdminClient#describeTopics(Collection)}
+     * During this time, {@link Admin#listTopics()} and {@link Admin#describeTopics(Collection)}
      * may not return information about the new topics.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported
      * from version 0.10.2.0.
      *
-     * @param newTopics         The new topics to create.
-     * @param options           The options to use when creating the new topics.
-     * @return                  The CreateTopicsResult.
+     * @param newTopics The new topics to create.
+     * @param options   The options to use when creating the new topics.
+     * @return The CreateTopicsResult.
      */
-    public abstract CreateTopicsResult createTopics(Collection<NewTopic> newTopics,
-                                                    CreateTopicsOptions options);
+    CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options);
 
     /**
-     * This is a convenience method for #{@link AdminClient#deleteTopics(Collection, DeleteTopicsOptions)}
+     * This is a convenience method for #{@link Admin#deleteTopics(Collection, DeleteTopicsOptions)}
      * with default options. See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.10.1.0 or higher.
      *
-     * @param topics            The topic names to delete.
-     * @return                  The DeleteTopicsResult.
+     * @param topics The topic names to delete.
+     * @return The DeleteTopicsResult.
      */
-    public DeleteTopicsResult deleteTopics(Collection<String> topics) {
+    default DeleteTopicsResult deleteTopics(Collection<String> topics) {
         return deleteTopics(topics, new DeleteTopicsOptions());
     }
 
     /**
      * Delete a batch of topics.
-     *
+     * <p>
      * This operation is not transactional so it may succeed for some topics while fail for others.
-     *
+     * <p>
      * It may take several seconds after the {@code DeleteTopicsResult} returns
      * success for all the brokers to become aware that the topics are gone.
-     * During this time, AdminClient#listTopics and AdminClient#describeTopics
+     * During this time, Admin#listTopics and Admin#describeTopics
      * may continue to return information about the deleted topics.
-     *
+     * <p>
      * If delete.topic.enable is false on the brokers, deleteTopics will mark
      * the topics for deletion, but not actually delete them.  The futures will
      * return successfully in this case.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.10.1.0 or higher.
      *
-     * @param topics            The topic names to delete.
-     * @param options           The options to use when deleting the topics.
-     * @return                  The DeleteTopicsResult.
+     * @param topics  The topic names to delete.
+     * @param options The options to use when deleting the topics.
+     * @return The DeleteTopicsResult.
      */
-    public abstract DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
+    DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
 
     /**
      * List the topics available in the cluster with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#listTopics(ListTopicsOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#listTopics(ListTopicsOptions)} with default options.
      * See the overload for more details.
      *
-     * @return                  The ListTopicsResult.
+     * @return The ListTopicsResult.
      */
-    public ListTopicsResult listTopics() {
+    default ListTopicsResult listTopics() {
         return listTopics(new ListTopicsOptions());
     }
 
     /**
      * List the topics available in the cluster.
      *
-     * @param options           The options to use when listing the topics.
-     * @return                  The ListTopicsResult.
+     * @param options The options to use when listing the topics.
+     * @return The ListTopicsResult.
      */
-    public abstract ListTopicsResult listTopics(ListTopicsOptions options);
+    ListTopicsResult listTopics(ListTopicsOptions options);
 
     /**
      * Describe some topics in the cluster, with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#describeTopics(Collection, DescribeTopicsOptions)} with
+     * <p>
+     * This is a convenience method for #{@link Admin#describeTopics(Collection, DescribeTopicsOptions)} with
      * default options. See the overload for more details.
      *
-     * @param topicNames        The names of the topics to describe.
-     *
-     * @return                  The DescribeTopicsResult.
+     * @param topicNames The names of the topics to describe.
+     * @return The DescribeTopicsResult.
      */
-    public DescribeTopicsResult describeTopics(Collection<String> topicNames) {
+    default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
         return describeTopics(topicNames, new DescribeTopicsOptions());
     }
 
     /**
      * Describe some topics in the cluster.
      *
-     * @param topicNames        The names of the topics to describe.
-     * @param options           The options to use when describing the topic.
-     *
-     * @return                  The DescribeTopicsResult.
+     * @param topicNames The names of the topics to describe.
+     * @param options    The options to use when describing the topic.
+     * @return The DescribeTopicsResult.
      */
-    public abstract DescribeTopicsResult describeTopics(Collection<String> topicNames,
-                                                         DescribeTopicsOptions options);
+    DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options);
 
     /**
      * Get information about the nodes in the cluster, using the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#describeCluster(DescribeClusterOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#describeCluster(DescribeClusterOptions)} with default options.
      * See the overload for more details.
      *
-     * @return                  The DescribeClusterResult.
+     * @return The DescribeClusterResult.
      */
-    public DescribeClusterResult describeCluster() {
+    default DescribeClusterResult describeCluster() {
         return describeCluster(new DescribeClusterOptions());
     }
 
     /**
      * Get information about the nodes in the cluster.
      *
-     * @param options           The options to use when getting information about the cluster.
-     * @return                  The DescribeClusterResult.
+     * @param options The options to use when getting information about the cluster.
+     * @return The DescribeClusterResult.
      */
-    public abstract DescribeClusterResult describeCluster(DescribeClusterOptions options);
+    DescribeClusterResult describeCluster(DescribeClusterOptions options);
 
     /**
-     * This is a convenience method for #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions)} with
+     * This is a convenience method for #{@link Admin#describeAcls(AclBindingFilter, DescribeAclsOptions)} with
      * default options. See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param filter            The filter to use.
-     * @return                  The DeleteAclsResult.
+     * @param filter The filter to use.
+     * @return The DeleteAclsResult.
      */
-    public DescribeAclsResult describeAcls(AclBindingFilter filter) {
+    default DescribeAclsResult describeAcls(AclBindingFilter filter) {
         return describeAcls(filter, new DescribeAclsOptions());
     }
 
     /**
      * Lists access control lists (ACLs) according to the supplied filter.
-     *
+     * <p>
      * Note: it may take some time for changes made by createAcls or deleteAcls to be reflected
      * in the output of describeAcls.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param filter            The filter to use.
-     * @param options           The options to use when listing the ACLs.
-     * @return                  The DeleteAclsResult.
+     * @param filter  The filter to use.
+     * @param options The options to use when listing the ACLs.
+     * @return The DeleteAclsResult.
      */
-    public abstract DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);
+    DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);
 
     /**
-     * This is a convenience method for #{@link AdminClient#createAcls(Collection, CreateAclsOptions)} with
+     * This is a convenience method for #{@link Admin#createAcls(Collection, CreateAclsOptions)} with
      * default options. See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param acls              The ACLs to create
-     * @return                  The CreateAclsResult.
+     * @param acls The ACLs to create
+     * @return The CreateAclsResult.
      */
-    public CreateAclsResult createAcls(Collection<AclBinding> acls) {
+    default CreateAclsResult createAcls(Collection<AclBinding> acls) {
         return createAcls(acls, new CreateAclsOptions());
     }
 
     /**
      * Creates access control lists (ACLs) which are bound to specific resources.
-     *
+     * <p>
      * This operation is not transactional so it may succeed for some ACLs while fail for others.
-     *
+     * <p>
      * If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
      * no changes will be made.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param acls              The ACLs to create
-     * @param options           The options to use when creating the ACLs.
-     * @return                  The CreateAclsResult.
+     * @param acls    The ACLs to create
+     * @param options The options to use when creating the ACLs.
+     * @return The CreateAclsResult.
      */
-    public abstract CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);
+    CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);
 
     /**
-     * This is a convenience method for #{@link AdminClient#deleteAcls(Collection, DeleteAclsOptions)} with default options.
+     * This is a convenience method for #{@link Admin#deleteAcls(Collection, DeleteAclsOptions)} with default options.
      * See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param filters           The filters to use.
-     * @return                  The DeleteAclsResult.
+     * @param filters The filters to use.
+     * @return The DeleteAclsResult.
      */
-    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
+    default DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
         return deleteAcls(filters, new DeleteAclsOptions());
     }
 
     /**
      * Deletes access control lists (ACLs) according to the supplied filters.
-     *
+     * <p>
      * This operation is not transactional so it may succeed for some ACLs while fail for others.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param filters           The filters to use.
-     * @param options           The options to use when deleting the ACLs.
-     * @return                  The DeleteAclsResult.
+     * @param filters The filters to use.
+     * @param options The options to use when deleting the ACLs.
+     * @return The DeleteAclsResult.
      */
-    public abstract DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
+    DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
 
 
     /**
      * Get the configuration for the specified resources with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#describeConfigs(Collection, DescribeConfigsOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#describeConfigs(Collection, DescribeConfigsOptions)} with default options.
      * See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param resources         The resources (topic and broker resource types are currently supported)
-     * @return                  The DescribeConfigsResult
+     * @param resources The resources (topic and broker resource types are currently supported)
+     * @return The DescribeConfigsResult
      */
-    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
+    default DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
         return describeConfigs(resources, new DescribeConfigsOptions());
     }
 
     /**
      * Get the configuration for the specified resources.
-     *
+     * <p>
      * The returned configuration includes default values and the isDefault() method can be used to distinguish them
      * from user supplied values.
-     *
+     * <p>
      * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
      * is not disclosed.
-     *
+     * <p>
      * Config entries where isReadOnly() is true cannot be updated.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param resources         The resources (topic and broker resource types are currently supported)
-     * @param options           The options to use when describing configs
-     * @return                  The DescribeConfigsResult
+     * @param resources The resources (topic and broker resource types are currently supported)
+     * @param options   The options to use when describing configs
+     * @return The DescribeConfigsResult
      */
-    public abstract DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources,
-                                                           DescribeConfigsOptions options);
+    DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options);
 
     /**
      * Update the configuration for the specified resources with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#alterConfigs(Map, AlterConfigsOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#alterConfigs(Map, AlterConfigsOptions)} with default options.
      * See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param configs         The resources with their configs (topic is the only resource type with configs that can
-     *                        be updated currently)
-     * @return                The AlterConfigsResult
+     * @param configs The resources with their configs (topic is the only resource type with configs that can
+     *                be updated currently)
+     * @return The AlterConfigsResult
      * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
      */
     @Deprecated
-    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
+    default AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
         return alterConfigs(configs, new AlterConfigsOptions());
     }
 
     /**
      * Update the configuration for the specified resources with the default options.
-     *
+     * <p>
      * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
      * a particular resource are updated atomically.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param configs         The resources with their configs (topic is the only resource type with configs that can
-     *                        be updated currently)
-     * @param options         The options to use when describing configs
-     * @return                The AlterConfigsResult
+     * @param configs The resources with their configs (topic is the only resource type with configs that can
+     *                be updated currently)
+     * @param options The options to use when describing configs
+     * @return The AlterConfigsResult
      * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}.
      */
     @Deprecated
-    public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
+    AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
 
     /**
      * Incrementally updates the configuration for the specified resources with default options.
-     *
-     * This is a convenience method for #{@link AdminClient#incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options.
      * See the overload for more details.*
-     *
+     * <p>
      * This operation is supported by brokers with version 2.3.0 or higher.
      *
-     * @param configs         The resources with their configs
-     * @return                The IncrementalAlterConfigsResult
+     * @param configs The resources with their configs
+     * @return The IncrementalAlterConfigsResult
      */
-    public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
+    default AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
         return incrementalAlterConfigs(configs, new AlterConfigsOptions());
     }
 
 
     /**
      * Incrementally update the configuration for the specified resources.
-     *
+     * <p>
      * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
      * a particular resource are updated atomically.
      *
      * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
      * the returned {@code IncrementalAlterConfigsResult}:</p>
      * <ul>
-     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
-     *   if the authenticated user didn't have alter access to the cluster.</li>
-     *   <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException}
-     *   if the authenticated user didn't have alter access to the Topic.</li>
-     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
-     *   if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li>
+     * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     * if the authenticated user didn't have alter access to the cluster.</li>
+     * <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException}
+     * if the authenticated user didn't have alter access to the Topic.</li>
+     * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     * if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li>
      * </ul>*
-     *
+     * <p>
      * This operation is supported by brokers with version 2.3.0 or higher.
      *
-     * @param configs         The resources with their configs
-     * @param options         The options to use when altering configs
-     * @return                The IncrementalAlterConfigsResult
+     * @param configs The resources with their configs
+     * @param options The options to use when altering configs
+     * @return The IncrementalAlterConfigsResult
      */
-    public abstract AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
-            Collection<AlterConfigOp>> configs, AlterConfigsOptions options);
+    AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
+        Collection<AlterConfigOp>> configs, AlterConfigsOptions options);
 
     /**
      * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
      * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
      * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
      * log directory if it is not already there.
-     *
+     * <p>
      * This operation is not transactional so it may succeed for some replicas while fail for others.
-     *
-     * This is a convenience method for #{@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
      * See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 1.1.0 or higher.
      *
-     * @param replicaAssignment  The replicas with their log directory absolute path
-     * @return                   The AlterReplicaLogDirsResult
+     * @param replicaAssignment The replicas with their log directory absolute path
+     * @return The AlterReplicaLogDirsResult
      */
-    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
+    default AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
         return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
     }
 
@@ -461,68 +456,69 @@ public abstract class AdminClient implements AutoCloseable {
      * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
      * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
      * log directory if it is not already there.
-     *
+     * <p>
      * This operation is not transactional so it may succeed for some replicas while fail for others.
-     *
+     * <p>
      * This operation is supported by brokers with version 1.1.0 or higher.
      *
-     * @param replicaAssignment  The replicas with their log directory absolute path
-     * @param options            The options to use when changing replica dir
-     * @return                   The AlterReplicaLogDirsResult
+     * @param replicaAssignment The replicas with their log directory absolute path
+     * @param options           The options to use when changing replica dir
+     * @return The AlterReplicaLogDirsResult
      */
-    public abstract AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options);
+    AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment,
+                                                  AlterReplicaLogDirsOptions options);
 
     /**
      * Query the information of all log directories on the given set of brokers
-     *
-     * This is a convenience method for #{@link AdminClient#describeLogDirs(Collection, DescribeLogDirsOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#describeLogDirs(Collection, DescribeLogDirsOptions)} with default options.
      * See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 1.0.0 or higher.
      *
-     * @param brokers     A list of brokers
-     * @return            The DescribeLogDirsResult
+     * @param brokers A list of brokers
+     * @return The DescribeLogDirsResult
      */
-    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
+    default DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
         return describeLogDirs(brokers, new DescribeLogDirsOptions());
     }
 
     /**
      * Query the information of all log directories on the given set of brokers
-     *
+     * <p>
      * This operation is supported by brokers with version 1.0.0 or higher.
      *
-     * @param brokers     A list of brokers
-     * @param options     The options to use when querying log dir info
-     * @return            The DescribeLogDirsResult
+     * @param brokers A list of brokers
+     * @param options The options to use when querying log dir info
+     * @return The DescribeLogDirsResult
      */
-    public abstract DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);
+    DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);
 
     /**
      * Query the replica log directory information for the specified replicas.
-     *
-     * This is a convenience method for #{@link AdminClient#describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
+     * <p>
+     * This is a convenience method for #{@link Admin#describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
      * with default options. See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 1.0.0 or higher.
      *
-     * @param replicas      The replicas to query
-     * @return              The DescribeReplicaLogDirsResult
+     * @param replicas The replicas to query
+     * @return The DescribeReplicaLogDirsResult
      */
-    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
+    default DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
         return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
     }
 
     /**
      * Query the replica log directory information for the specified replicas.
-     *
+     * <p>
      * This operation is supported by brokers with version 1.0.0 or higher.
      *
-     * @param replicas      The replicas to query
-     * @param options       The options to use when querying replica log dir info
-     * @return              The DescribeReplicaLogDirsResult
+     * @param replicas The replicas to query
+     * @param options  The options to use when querying replica log dir info
+     * @return The DescribeReplicaLogDirsResult
      */
-    public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
+    DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
 
     /**
      * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
@@ -534,9 +530,9 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param newPartitions The topics which should have new partitions created, and corresponding parameters
      *                      for the created partitions.
-     * @return              The CreatePartitionsResult.
+     * @return The CreatePartitionsResult.
      */
-    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
+    default CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
         return createPartitions(newPartitions, new CreatePartitionsOptions());
     }
 
@@ -549,7 +545,7 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * <p>It may take several seconds after this method returns
      * success for all the brokers to become aware that the partitions have been created.
-     * During this time, {@link AdminClient#describeTopics(Collection)}
+     * During this time, {@link Admin#describeTopics(Collection)}
      * may not return information about the new partitions.</p>
      *
      * <p>This operation is supported by brokers with version 1.0.0 or higher.</p>
@@ -557,55 +553,55 @@ public abstract class AdminClient implements AutoCloseable {
      * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
      * {@link CreatePartitionsResult#values() values()} method of the returned {@code CreatePartitionsResult}</p>
      * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.AuthorizationException}
-     *     if the authenticated user is not authorized to alter the topic</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
-     *     <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
-     *     if a partition reassignment is currently in progress</li>
-     *     <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
-     *     if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li>
-     *     <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
-     *     if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign
-     *     replicas with the topics replication factor.</li>
-     *     <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
-     *     if the request is invalid in some way.</li>
+     * <li>{@link org.apache.kafka.common.errors.AuthorizationException}
+     * if the authenticated user is not authorized to alter the topic</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
+     * <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
+     * if a partition reassignment is currently in progress</li>
+     * <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
+     * if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li>
+     * <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
+     * if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign
+     * replicas with the topics replication factor.</li>
+     * <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
+     * if the request is invalid in some way.</li>
      * </ul>
      *
      * @param newPartitions The topics which should have new partitions created, and corresponding parameters
      *                      for the created partitions.
      * @param options       The options to use when creating the new paritions.
-     * @return              The CreatePartitionsResult.
+     * @return The CreatePartitionsResult.
      */
-    public abstract CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
-                                                            CreatePartitionsOptions options);
+    CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
+                                            CreatePartitionsOptions options);
 
     /**
      * Delete records whose offset is smaller than the given offset of the corresponding partition.
-     *
+     * <p>
      * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
      * See the overload for more details.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
-     * @return                      The DeleteRecordsResult.
+     * @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
+     * @return The DeleteRecordsResult.
      */
-    public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
+    default DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
         return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
     }
 
     /**
      * Delete records whose offset is smaller than the given offset of the corresponding partition.
-     *
+     * <p>
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
-     * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
-     * @param options               The options to use when deleting records.
-     * @return                      The DeleteRecordsResult.
+     * @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
+     * @param options         The options to use when deleting records.
+     * @return The DeleteRecordsResult.
      */
-    public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
-                                                      DeleteRecordsOptions options);
+    DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
+                                      DeleteRecordsOptions options);
 
     /**
      * <p>Create a Delegation Token.</p>
@@ -613,9 +609,9 @@ public abstract class AdminClient implements AutoCloseable {
      * <p>This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
      * See the overload for more details.</p>
      *
-     * @return                      The CreateDelegationTokenResult.
+     * @return The CreateDelegationTokenResult.
      */
-    public CreateDelegationTokenResult createDelegationToken() {
+    default CreateDelegationTokenResult createDelegationToken() {
         return createDelegationToken(new CreateDelegationTokenOptions());
     }
 
@@ -628,20 +624,20 @@ public abstract class AdminClient implements AutoCloseable {
      * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
      * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult}</p>
      * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
-     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
-     *     <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
-     *     if the renewers principal type is not supported.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
-     *     if the delegation token feature is disabled.</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
+     * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     * <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
+     * if the renewers principal type is not supported.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     * if the delegation token feature is disabled.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
      * </ul>
      *
-     * @param options               The options to use when creating delegation token.
-     * @return                      The DeleteRecordsResult.
+     * @param options The options to use when creating delegation token.
+     * @return The DeleteRecordsResult.
      */
-    public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
+    CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
 
 
     /**
@@ -650,11 +646,10 @@ public abstract class AdminClient implements AutoCloseable {
      * <p>This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
      * See the overload for more details.</p>
      *
-     *
-     * @param hmac                  HMAC of the Delegation token
-     * @return                      The RenewDelegationTokenResult.
+     * @param hmac HMAC of the Delegation token
+     * @return The RenewDelegationTokenResult.
      */
-    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
+    default RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
         return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
     }
 
@@ -666,25 +661,25 @@ public abstract class AdminClient implements AutoCloseable {
      * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
      * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult}</p>
      * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
-     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
-     *     if the delegation token feature is disabled.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
-     *     if the delegation token is not found on server.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
-     *     if the authenticated user is not owner/renewer of the token.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
-     *     if the delegation token is expired.</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
+     * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     * if the delegation token feature is disabled.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     * if the delegation token is not found on server.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     * if the authenticated user is not owner/renewer of the token.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     * if the delegation token is expired.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
      * </ul>
      *
-     * @param hmac                  HMAC of the Delegation token
-     * @param options               The options to use when renewing delegation token.
-     * @return                      The RenewDelegationTokenResult.
+     * @param hmac    HMAC of the Delegation token
+     * @param options The options to use when renewing delegation token.
+     * @return The RenewDelegationTokenResult.
      */
-    public abstract RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
+    RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
 
     /**
      * <p>Expire a Delegation Token.</p>
@@ -692,10 +687,10 @@ public abstract class AdminClient implements AutoCloseable {
      * <p>This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
      * This will expire the token immediately. See the overload for more details.</p>
      *
-     * @param hmac                  HMAC of the Delegation token
-     * @return                      The ExpireDelegationTokenResult.
+     * @param hmac HMAC of the Delegation token
+     * @return The ExpireDelegationTokenResult.
      */
-    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
+    default ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
         return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
     }
 
@@ -707,35 +702,35 @@ public abstract class AdminClient implements AutoCloseable {
      * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
      * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code ExpireDelegationTokenResult}</p>
      * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
-     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
-     *     if the delegation token feature is disabled.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
-     *     if the delegation token is not found on server.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
-     *     if the authenticated user is not owner/renewer of the requested token.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
-     *     if the delegation token is expired.</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
+     * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     * if the delegation token feature is disabled.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     * if the delegation token is not found on server.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     * if the authenticated user is not owner/renewer of the requested token.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     * if the delegation token is expired.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
      * </ul>
      *
-     * @param hmac                  HMAC of the Delegation token
-     * @param options               The options to use when expiring delegation token.
-     * @return                      The ExpireDelegationTokenResult.
+     * @param hmac    HMAC of the Delegation token
+     * @param options The options to use when expiring delegation token.
+     * @return The ExpireDelegationTokenResult.
      */
-    public abstract ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
+    ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
 
     /**
-     *<p>Describe the Delegation Tokens.</p>
+     * <p>Describe the Delegation Tokens.</p>
      *
      * <p>This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
      * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.</p>
      *
-     * @return                      The DescribeDelegationTokenResult.
+     * @return The DescribeDelegationTokenResult.
      */
-    public DescribeDelegationTokenResult describeDelegationToken() {
+    default DescribeDelegationTokenResult describeDelegationToken() {
         return describeDelegationToken(new DescribeDelegationTokenOptions());
     }
 
@@ -747,18 +742,18 @@ public abstract class AdminClient implements AutoCloseable {
      * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
      * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@code DescribeDelegationTokenResult}</p>
      * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
-     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
-     *     if the delegation token feature is disabled.</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
+     * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     * if the delegation token feature is disabled.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
      * </ul>
      *
-     * @param options               The options to use when describing delegation tokens.
-     * @return                      The DescribeDelegationTokenResult.
+     * @param options The options to use when describing delegation tokens.
+     * @return The DescribeDelegationTokenResult.
      */
-    public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
+    DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
 
     /**
      * Describe some group IDs in the cluster.
@@ -767,112 +762,112 @@ public abstract class AdminClient implements AutoCloseable {
      * @param options  The options to use when describing the groups.
      * @return The DescribeConsumerGroupResult.
      */
-    public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
-                                                                        DescribeConsumerGroupsOptions options);
+    DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
+                                                        DescribeConsumerGroupsOptions options);
 
     /**
      * Describe some group IDs in the cluster, with the default options.
      * <p>
      * This is a convenience method for
-     * #{@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)} with
+     * #{@link Admin#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)} with
      * default options. See the overload for more details.
      *
      * @param groupIds The IDs of the groups to describe.
      * @return The DescribeConsumerGroupResult.
      */
-    public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
+    default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
         return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
     }
 
     /**
      * List the consumer groups available in the cluster.
      *
-     * @param options           The options to use when listing the consumer groups.
+     * @param options The options to use when listing the consumer groups.
      * @return The ListGroupsResult.
      */
-    public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);
+    ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);
 
     /**
      * List the consumer groups available in the cluster with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#listConsumerGroups(ListConsumerGroupsOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#listConsumerGroups(ListConsumerGroupsOptions)} with default options.
      * See the overload for more details.
      *
      * @return The ListGroupsResult.
      */
-    public ListConsumerGroupsResult listConsumerGroups() {
+    default ListConsumerGroupsResult listConsumerGroups() {
         return listConsumerGroups(new ListConsumerGroupsOptions());
     }
 
     /**
      * List the consumer group offsets available in the cluster.
      *
-     * @param options           The options to use when listing the consumer group offsets.
+     * @param options The options to use when listing the consumer group offsets.
      * @return The ListGroupOffsetsResult
      */
-    public abstract ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
 
     /**
      * List the consumer group offsets available in the cluster with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
+     * <p>
+     * This is a convenience method for #{@link Admin#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
      *
      * @return The ListGroupOffsetsResult.
      */
-    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
+    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
         return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
     }
 
     /**
      * Delete consumer groups from the cluster.
      *
-     * @param options           The options to use when deleting a consumer group.
+     * @param options The options to use when deleting a consumer group.
      * @return The DeletConsumerGroupResult.
      */
-    public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
+    DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
 
     /**
      * Delete consumer groups from the cluster with the default options.
      *
      * @return The DeleteConsumerGroupResult.
      */
-    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
+    default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
         return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
     }
 
     /**
      * Elect the preferred replica as leader for topic partitions.
-     *
+     * <p>
      * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
      * with preferred election type and default options.
-     *
+     * <p>
      * This operation is supported by brokers with version 2.2.0 or higher.
      *
-     * @param partitions      The partitions for which the preferred leader should be elected.
-     * @return                The ElectPreferredLeadersResult.
-     * @deprecated            Since 2.4.0. Use {@link #electLeaders(ElectionType, Set)}.
+     * @param partitions The partitions for which the preferred leader should be elected.
+     * @return The ElectPreferredLeadersResult.
+     * @deprecated Since 2.4.0. Use {@link #electLeaders(ElectionType, Set)}.
      */
     @Deprecated
-    public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) {
+    default ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) {
         return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions());
     }
 
     /**
      * Elect the preferred replica as leader for topic partitions.
-     *
+     * <p>
      * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
      * with preferred election type.
-     *
+     * <p>
      * This operation is supported by brokers with version 2.2.0 or higher.
      *
-     * @param partitions      The partitions for which the preferred leader should be elected.
-     * @param options         The options to use when electing the preferred leaders.
-     * @return                The ElectPreferredLeadersResult.
-     * @deprecated            Since 2.4.0. Use {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}.
+     * @param partitions The partitions for which the preferred leader should be elected.
+     * @param options    The options to use when electing the preferred leaders.
+     * @return The ElectPreferredLeadersResult.
+     * @deprecated Since 2.4.0. Use {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}.
      */
     @Deprecated
-    public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
-                                                             ElectPreferredLeadersOptions options) {
+    default ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
+                                                              ElectPreferredLeadersOptions options) {
         final ElectLeadersOptions newOptions = new ElectLeadersOptions();
         newOptions.timeoutMs(options.timeoutMs());
         final Set<TopicPartition> topicPartitions = partitions == null ? null : new HashSet<>(partitions);
@@ -882,61 +877,61 @@ public abstract class AdminClient implements AutoCloseable {
 
     /**
      * Elect a replica as leader for topic partitions.
-     *
+     * <p>
      * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
      * with default options.
      *
-     * @param electionType            The type of election to conduct.
-     * @param partitions              The topics and partitions for which to conduct elections.
-     * @return                        The ElectLeadersResult.
+     * @param electionType The type of election to conduct.
+     * @param partitions   The topics and partitions for which to conduct elections.
+     * @return The ElectLeadersResult.
      */
-    public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions) {
+    default ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions) {
         return electLeaders(electionType, partitions, new ElectLeadersOptions());
     }
 
     /**
      * Elect a replica as leader for the given {@code partitions}, or for all partitions if the argumentl
      * to {@code partitions} is null.
-     *
+     * <p>
      * This operation is not transactional so it may succeed for some partitions while fail for others.
-     *
+     * <p>
      * It may take several seconds after this method returns success for all the brokers in the cluster
      * to become aware that the partitions have new leaders. During this time,
-     * {@link AdminClient#describeTopics(Collection)} may not return information about the partitions'
+     * {@link Admin#describeTopics(Collection)} may not return information about the partitions'
      * new leaders.
-     *
+     * <p>
      * This operation is supported by brokers with version 2.2.0 or later if preferred eleciton is use;
      * otherwise the brokers most be 2.4.0 or higher.
      *
      * <p>The following exceptions can be anticipated when calling {@code get()} on the future obtained
      * from the returned {@code ElectLeadersResult}:</p>
      * <ul>
-     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
-     *   if the authenticated user didn't have alter access to the cluster.</li>
-     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
-     *   if the topic or partition did not exist within the cluster.</li>
-     *   <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
-     *   if the topic was already queued for deletion.</li>
-     *   <li>{@link org.apache.kafka.common.errors.NotControllerException}
-     *   if the request was sent to a broker that was not the controller for the cluster.</li>
-     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *   if the request timed out before the election was complete.</li>
-     *   <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
-     *   if the preferred leader was not alive or not in the ISR.</li>
+     * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     * if the authenticated user didn't have alter access to the cluster.</li>
+     * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     * if the topic or partition did not exist within the cluster.</li>
+     * <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
+     * if the topic was already queued for deletion.</li>
+     * <li>{@link org.apache.kafka.common.errors.NotControllerException}
+     * if the request was sent to a broker that was not the controller for the cluster.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request timed out before the election was complete.</li>
+     * <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
+     * if the preferred leader was not alive or not in the ISR.</li>
      * </ul>
      *
-     * @param electionType            The type of election to conduct.
-     * @param partitions              The topics and partitions for which to conduct elections.
-     * @param options                 The options to use when electing the leaders.
-     * @return                        The ElectLeadersResult.
+     * @param electionType The type of election to conduct.
+     * @param partitions   The topics and partitions for which to conduct elections.
+     * @param options      The options to use when electing the leaders.
+     * @return The ElectLeadersResult.
      */
-    public abstract ElectLeadersResult electLeaders(
-            ElectionType electionType,
-            Set<TopicPartition> partitions,
-            ElectLeadersOptions options);
+    ElectLeadersResult electLeaders(
+        ElectionType electionType,
+        Set<TopicPartition> partitions,
+        ElectLeadersOptions options);
 
     /**
      * Get the metrics kept by the adminClient
      */
-    public abstract Map<MetricName, ? extends Metric> metrics();
+    Map<MetricName, ? extends Metric> metrics();
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index ad9409a..75f1c5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -17,926 +17,36 @@
 
 package org.apache.kafka.clients.admin;
 
-import java.time.Duration;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.kafka.common.ElectionType;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.acl.AclBindingFilter;
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.config.ConfigResource;
 
 /**
- * The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
+ * The base class for in-built admin clients.
  *
- * The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker
- * version required.
+ * Client code should use the newer {@link Admin} interface in preference to this class.
  *
- * This client was introduced in 0.11.0.0 and the API is still evolving. We will try to evolve the API in a compatible
- * manner, but we reserve the right to make breaking changes in minor releases, if necessary. We will update the
- * {@code InterfaceStability} annotation and this notice once the API is considered stable.
+ * This class may be removed in a later release, but has not be marked as deprecated to avoid unnecessary noise.
  */
-@InterfaceStability.Evolving
-public abstract class AdminClient implements AutoCloseable {
+public abstract class AdminClient implements Admin {
 
     /**
-     * Create a new AdminClient with the given configuration.
+     * Create a new Admin with the given configuration.
      *
      * @param props The configuration.
      * @return The new KafkaAdminClient.
      */
     public static AdminClient create(Properties props) {
-        return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), null);
+        return (AdminClient) Admin.create(props);
     }
 
     /**
-     * Create a new AdminClient with the given configuration.
+     * Create a new Admin with the given configuration.
      *
      * @param conf The configuration.
      * @return The new KafkaAdminClient.
      */
     public static AdminClient create(Map<String, Object> conf) {
-        return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null);
+        return (AdminClient) Admin.create(conf);
     }
-
-    /**
-     * Close the AdminClient and release all associated resources.
-     *
-     * See {@link AdminClient#close(long, TimeUnit)}
-     */
-    @Override
-    public void close() {
-        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Close the AdminClient and release all associated resources.
-     *
-     * The close operation has a grace period during which current operations will be allowed to
-     * complete, specified by the given duration and time unit.
-     * New operations will not be accepted during the grace period.  Once the grace period is over,
-     * all operations that have not yet been completed will be aborted with a TimeoutException.
-     *
-     * @param duration  The duration to use for the wait time.
-     * @param unit      The time unit to use for the wait time.
-     * @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
-     */
-    @Deprecated
-    public void close(long duration, TimeUnit unit) {
-        close(Duration.ofMillis(unit.toMillis(duration)));
-    }
-
-    /**
-     * Close the AdminClient and release all associated resources.
-     *
-     * The close operation has a grace period during which current operations will be allowed to
-     * complete, specified by the given duration.
-     * New operations will not be accepted during the grace period.  Once the grace period is over,
-     * all operations that have not yet been completed will be aborted with a TimeoutException.
-     *
-     * @param timeout  The time to use for the wait time.
-     */
-    public abstract void close(Duration timeout);
-
-    /**
-     * Create a batch of new topics with the default options.
-     *
-     * This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * This operation is supported by brokers with version 0.10.1.0 or higher.
-     *
-     * @param newTopics         The new topics to create.
-     * @return                  The CreateTopicsResult.
-     */
-    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
-        return createTopics(newTopics, new CreateTopicsOptions());
-    }
-
-    /**
-     * Create a batch of new topics.
-     *
-     * This operation is not transactional so it may succeed for some topics while fail for others.
-     *
-     * It may take several seconds after {@code CreateTopicsResult} returns
-     * success for all the brokers to become aware that the topics have been created.
-     * During this time, {@link AdminClient#listTopics()} and {@link AdminClient#describeTopics(Collection)}
-     * may not return information about the new topics.
-     *
-     * This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported
-     * from version 0.10.2.0.
-     *
-     * @param newTopics         The new topics to create.
-     * @param options           The options to use when creating the new topics.
-     * @return                  The CreateTopicsResult.
-     */
-    public abstract CreateTopicsResult createTopics(Collection<NewTopic> newTopics,
-                                                    CreateTopicsOptions options);
-
-    /**
-     * This is a convenience method for #{@link AdminClient#deleteTopics(Collection, DeleteTopicsOptions)}
-     * with default options. See the overload for more details.
-     *
-     * This operation is supported by brokers with version 0.10.1.0 or higher.
-     *
-     * @param topics            The topic names to delete.
-     * @return                  The DeleteTopicsResult.
-     */
-    public DeleteTopicsResult deleteTopics(Collection<String> topics) {
-        return deleteTopics(topics, new DeleteTopicsOptions());
-    }
-
-    /**
-     * Delete a batch of topics.
-     *
-     * This operation is not transactional so it may succeed for some topics while fail for others.
-     *
-     * It may take several seconds after the {@code DeleteTopicsResult} returns
-     * success for all the brokers to become aware that the topics are gone.
-     * During this time, AdminClient#listTopics and AdminClient#describeTopics
-     * may continue to return information about the deleted topics.
-     *
-     * If delete.topic.enable is false on the brokers, deleteTopics will mark
-     * the topics for deletion, but not actually delete them.  The futures will
-     * return successfully in this case.
-     *
-     * This operation is supported by brokers with version 0.10.1.0 or higher.
-     *
-     * @param topics            The topic names to delete.
-     * @param options           The options to use when deleting the topics.
-     * @return                  The DeleteTopicsResult.
-     */
-    public abstract DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
-
-    /**
-     * List the topics available in the cluster with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#listTopics(ListTopicsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * @return                  The ListTopicsResult.
-     */
-    public ListTopicsResult listTopics() {
-        return listTopics(new ListTopicsOptions());
-    }
-
-    /**
-     * List the topics available in the cluster.
-     *
-     * @param options           The options to use when listing the topics.
-     * @return                  The ListTopicsResult.
-     */
-    public abstract ListTopicsResult listTopics(ListTopicsOptions options);
-
-    /**
-     * Describe some topics in the cluster, with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#describeTopics(Collection, DescribeTopicsOptions)} with
-     * default options. See the overload for more details.
-     *
-     * @param topicNames        The names of the topics to describe.
-     *
-     * @return                  The DescribeTopicsResult.
-     */
-    public DescribeTopicsResult describeTopics(Collection<String> topicNames) {
-        return describeTopics(topicNames, new DescribeTopicsOptions());
-    }
-
-    /**
-     * Describe some topics in the cluster.
-     *
-     * @param topicNames        The names of the topics to describe.
-     * @param options           The options to use when describing the topic.
-     *
-     * @return                  The DescribeTopicsResult.
-     */
-    public abstract DescribeTopicsResult describeTopics(Collection<String> topicNames,
-                                                         DescribeTopicsOptions options);
-
-    /**
-     * Get information about the nodes in the cluster, using the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#describeCluster(DescribeClusterOptions)} with default options.
-     * See the overload for more details.
-     *
-     * @return                  The DescribeClusterResult.
-     */
-    public DescribeClusterResult describeCluster() {
-        return describeCluster(new DescribeClusterOptions());
-    }
-
-    /**
-     * Get information about the nodes in the cluster.
-     *
-     * @param options           The options to use when getting information about the cluster.
-     * @return                  The DescribeClusterResult.
-     */
-    public abstract DescribeClusterResult describeCluster(DescribeClusterOptions options);
-
-    /**
-     * This is a convenience method for #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions)} with
-     * default options. See the overload for more details.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param filter            The filter to use.
-     * @return                  The DeleteAclsResult.
-     */
-    public DescribeAclsResult describeAcls(AclBindingFilter filter) {
-        return describeAcls(filter, new DescribeAclsOptions());
-    }
-
-    /**
-     * Lists access control lists (ACLs) according to the supplied filter.
-     *
-     * Note: it may take some time for changes made by createAcls or deleteAcls to be reflected
-     * in the output of describeAcls.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param filter            The filter to use.
-     * @param options           The options to use when listing the ACLs.
-     * @return                  The DeleteAclsResult.
-     */
-    public abstract DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);
-
-    /**
-     * This is a convenience method for #{@link AdminClient#createAcls(Collection, CreateAclsOptions)} with
-     * default options. See the overload for more details.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param acls              The ACLs to create
-     * @return                  The CreateAclsResult.
-     */
-    public CreateAclsResult createAcls(Collection<AclBinding> acls) {
-        return createAcls(acls, new CreateAclsOptions());
-    }
-
-    /**
-     * Creates access control lists (ACLs) which are bound to specific resources.
-     *
-     * This operation is not transactional so it may succeed for some ACLs while fail for others.
-     *
-     * If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
-     * no changes will be made.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param acls              The ACLs to create
-     * @param options           The options to use when creating the ACLs.
-     * @return                  The CreateAclsResult.
-     */
-    public abstract CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);
-
-    /**
-     * This is a convenience method for #{@link AdminClient#deleteAcls(Collection, DeleteAclsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param filters           The filters to use.
-     * @return                  The DeleteAclsResult.
-     */
-    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
-        return deleteAcls(filters, new DeleteAclsOptions());
-    }
-
-    /**
-     * Deletes access control lists (ACLs) according to the supplied filters.
-     *
-     * This operation is not transactional so it may succeed for some ACLs while fail for others.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param filters           The filters to use.
-     * @param options           The options to use when deleting the ACLs.
-     * @return                  The DeleteAclsResult.
-     */
-    public abstract DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
-
-
-    /**
-     * Get the configuration for the specified resources with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#describeConfigs(Collection, DescribeConfigsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param resources         The resources (topic and broker resource types are currently supported)
-     * @return                  The DescribeConfigsResult
-     */
-    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
-        return describeConfigs(resources, new DescribeConfigsOptions());
-    }
-
-    /**
-     * Get the configuration for the specified resources.
-     *
-     * The returned configuration includes default values and the isDefault() method can be used to distinguish them
-     * from user supplied values.
-     *
-     * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
-     * is not disclosed.
-     *
-     * Config entries where isReadOnly() is true cannot be updated.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param resources         The resources (topic and broker resource types are currently supported)
-     * @param options           The options to use when describing configs
-     * @return                  The DescribeConfigsResult
-     */
-    public abstract DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources,
-                                                           DescribeConfigsOptions options);
-
-    /**
-     * Update the configuration for the specified resources with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#alterConfigs(Map, AlterConfigsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param configs         The resources with their configs (topic is the only resource type with configs that can
-     *                        be updated currently)
-     * @return                The AlterConfigsResult
-     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
-     */
-    @Deprecated
-    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
-        return alterConfigs(configs, new AlterConfigsOptions());
-    }
-
-    /**
-     * Update the configuration for the specified resources with the default options.
-     *
-     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
-     * a particular resource are updated atomically.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param configs         The resources with their configs (topic is the only resource type with configs that can
-     *                        be updated currently)
-     * @param options         The options to use when describing configs
-     * @return                The AlterConfigsResult
-     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}.
-     */
-    @Deprecated
-    public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
-
-    /**
-     * Incrementally updates the configuration for the specified resources with default options.
-     *
-     * This is a convenience method for #{@link AdminClient#incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options.
-     * See the overload for more details.*
-     *
-     * This operation is supported by brokers with version 2.3.0 or higher.
-     *
-     * @param configs         The resources with their configs
-     * @return                The IncrementalAlterConfigsResult
-     */
-    public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
-        return incrementalAlterConfigs(configs, new AlterConfigsOptions());
-    }
-
-
-    /**
-     * Incrementally update the configuration for the specified resources.
-     *
-     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
-     * a particular resource are updated atomically.
-     *
-     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
-     * the returned {@code IncrementalAlterConfigsResult}:</p>
-     * <ul>
-     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
-     *   if the authenticated user didn't have alter access to the cluster.</li>
-     *   <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException}
-     *   if the authenticated user didn't have alter access to the Topic.</li>
-     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
-     *   if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li>
-     * </ul>*
-     *
-     * This operation is supported by brokers with version 2.3.0 or higher.
-     *
-     * @param configs         The resources with their configs
-     * @param options         The options to use when altering configs
-     * @return                The IncrementalAlterConfigsResult
-     */
-    public abstract AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
-            Collection<AlterConfigOp>> configs, AlterConfigsOptions options);
-
-    /**
-     * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
-     * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
-     * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
-     * log directory if it is not already there.
-     *
-     * This operation is not transactional so it may succeed for some replicas while fail for others.
-     *
-     * This is a convenience method for #{@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * This operation is supported by brokers with version 1.1.0 or higher.
-     *
-     * @param replicaAssignment  The replicas with their log directory absolute path
-     * @return                   The AlterReplicaLogDirsResult
-     */
-    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
-        return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
-    }
-
-    /**
-     * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
-     * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
-     * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
-     * log directory if it is not already there.
-     *
-     * This operation is not transactional so it may succeed for some replicas while fail for others.
-     *
-     * This operation is supported by brokers with version 1.1.0 or higher.
-     *
-     * @param replicaAssignment  The replicas with their log directory absolute path
-     * @param options            The options to use when changing replica dir
-     * @return                   The AlterReplicaLogDirsResult
-     */
-    public abstract AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options);
-
-    /**
-     * Query the information of all log directories on the given set of brokers
-     *
-     * This is a convenience method for #{@link AdminClient#describeLogDirs(Collection, DescribeLogDirsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * This operation is supported by brokers with version 1.0.0 or higher.
-     *
-     * @param brokers     A list of brokers
-     * @return            The DescribeLogDirsResult
-     */
-    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
-        return describeLogDirs(brokers, new DescribeLogDirsOptions());
-    }
-
-    /**
-     * Query the information of all log directories on the given set of brokers
-     *
-     * This operation is supported by brokers with version 1.0.0 or higher.
-     *
-     * @param brokers     A list of brokers
-     * @param options     The options to use when querying log dir info
-     * @return            The DescribeLogDirsResult
-     */
-    public abstract DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);
-
-    /**
-     * Query the replica log directory information for the specified replicas.
-     *
-     * This is a convenience method for #{@link AdminClient#describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
-     * with default options. See the overload for more details.
-     *
-     * This operation is supported by brokers with version 1.0.0 or higher.
-     *
-     * @param replicas      The replicas to query
-     * @return              The DescribeReplicaLogDirsResult
-     */
-    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
-        return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
-    }
-
-    /**
-     * Query the replica log directory information for the specified replicas.
-     *
-     * This operation is supported by brokers with version 1.0.0 or higher.
-     *
-     * @param replicas      The replicas to query
-     * @param options       The options to use when querying replica log dir info
-     * @return              The DescribeReplicaLogDirsResult
-     */
-    public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
-
-    /**
-     * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
-     * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
-     * the partition logic or ordering of the messages will be affected.</strong></p>
-     *
-     * <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
-     * See the overload for more details.</p>
-     *
-     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
-     *                      for the created partitions.
-     * @return              The CreatePartitionsResult.
-     */
-    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
-        return createPartitions(newPartitions, new CreatePartitionsOptions());
-    }
-
-    /**
-     * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
-     * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
-     * the partition logic or ordering of the messages will be affected.</strong></p>
-     *
-     * <p>This operation is not transactional so it may succeed for some topics while fail for others.</p>
-     *
-     * <p>It may take several seconds after this method returns
-     * success for all the brokers to become aware that the partitions have been created.
-     * During this time, {@link AdminClient#describeTopics(Collection)}
-     * may not return information about the new partitions.</p>
-     *
-     * <p>This operation is supported by brokers with version 1.0.0 or higher.</p>
-     *
-     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
-     * {@link CreatePartitionsResult#values() values()} method of the returned {@code CreatePartitionsResult}</p>
-     * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.AuthorizationException}
-     *     if the authenticated user is not authorized to alter the topic</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
-     *     <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
-     *     if a partition reassignment is currently in progress</li>
-     *     <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
-     *     if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li>
-     *     <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
-     *     if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign
-     *     replicas with the topics replication factor.</li>
-     *     <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
-     *     if the request is invalid in some way.</li>
-     * </ul>
-     *
-     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
-     *                      for the created partitions.
-     * @param options       The options to use when creating the new paritions.
-     * @return              The CreatePartitionsResult.
-     */
-    public abstract CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
-                                                            CreatePartitionsOptions options);
-
-    /**
-     * Delete records whose offset is smaller than the given offset of the corresponding partition.
-     *
-     * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
-     * @return                      The DeleteRecordsResult.
-     */
-    public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
-        return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
-    }
-
-    /**
-     * Delete records whose offset is smaller than the given offset of the corresponding partition.
-     *
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
-     * @param options               The options to use when deleting records.
-     * @return                      The DeleteRecordsResult.
-     */
-    public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
-                                                      DeleteRecordsOptions options);
-
-    /**
-     * <p>Create a Delegation Token.</p>
-     *
-     * <p>This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
-     * See the overload for more details.</p>
-     *
-     * @return                      The CreateDelegationTokenResult.
-     */
-    public CreateDelegationTokenResult createDelegationToken() {
-        return createDelegationToken(new CreateDelegationTokenOptions());
-    }
-
-
-    /**
-     * <p>Create a Delegation Token.</p>
-     *
-     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
-     *
-     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
-     * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult}</p>
-     * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
-     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
-     *     <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
-     *     if the renewers principal type is not supported.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
-     *     if the delegation token feature is disabled.</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
-     * </ul>
-     *
-     * @param options               The options to use when creating delegation token.
-     * @return                      The DeleteRecordsResult.
-     */
-    public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
-
-
-    /**
-     * <p>Renew a Delegation Token.</p>
-     *
-     * <p>This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
-     * See the overload for more details.</p>
-     *
-     *
-     * @param hmac                  HMAC of the Delegation token
-     * @return                      The RenewDelegationTokenResult.
-     */
-    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
-        return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
-    }
-
-    /**
-     * <p> Renew a Delegation Token.</p>
-     *
-     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
-     *
-     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
-     * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult}</p>
-     * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
-     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
-     *     if the delegation token feature is disabled.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
-     *     if the delegation token is not found on server.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
-     *     if the authenticated user is not owner/renewer of the token.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
-     *     if the delegation token is expired.</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
-     * </ul>
-     *
-     * @param hmac                  HMAC of the Delegation token
-     * @param options               The options to use when renewing delegation token.
-     * @return                      The RenewDelegationTokenResult.
-     */
-    public abstract RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
-
-    /**
-     * <p>Expire a Delegation Token.</p>
-     *
-     * <p>This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
-     * This will expire the token immediately. See the overload for more details.</p>
-     *
-     * @param hmac                  HMAC of the Delegation token
-     * @return                      The ExpireDelegationTokenResult.
-     */
-    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
-        return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
-    }
-
-    /**
-     * <p>Expire a Delegation Token.</p>
-     *
-     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
-     *
-     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
-     * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code ExpireDelegationTokenResult}</p>
-     * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
-     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
-     *     if the delegation token feature is disabled.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
-     *     if the delegation token is not found on server.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
-     *     if the authenticated user is not owner/renewer of the requested token.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
-     *     if the delegation token is expired.</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
-     * </ul>
-     *
-     * @param hmac                  HMAC of the Delegation token
-     * @param options               The options to use when expiring delegation token.
-     * @return                      The ExpireDelegationTokenResult.
-     */
-    public abstract ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
-
-    /**
-     *<p>Describe the Delegation Tokens.</p>
-     *
-     * <p>This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
-     * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.</p>
-     *
-     * @return                      The DescribeDelegationTokenResult.
-     */
-    public DescribeDelegationTokenResult describeDelegationToken() {
-        return describeDelegationToken(new DescribeDelegationTokenOptions());
-    }
-
-    /**
-     * <p>Describe the Delegation Tokens.</p>
-     *
-     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
-     *
-     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
-     * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@code DescribeDelegationTokenResult}</p>
-     * <ul>
-     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
-     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
-     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
-     *     if the delegation token feature is disabled.</li>
-     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *     if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
-     * </ul>
-     *
-     * @param options               The options to use when describing delegation tokens.
-     * @return                      The DescribeDelegationTokenResult.
-     */
-    public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
-
-    /**
-     * Describe some group IDs in the cluster.
-     *
-     * @param groupIds The IDs of the groups to describe.
-     * @param options  The options to use when describing the groups.
-     * @return The DescribeConsumerGroupResult.
-     */
-    public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
-                                                                        DescribeConsumerGroupsOptions options);
-
-    /**
-     * Describe some group IDs in the cluster, with the default options.
-     * <p>
-     * This is a convenience method for
-     * #{@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)} with
-     * default options. See the overload for more details.
-     *
-     * @param groupIds The IDs of the groups to describe.
-     * @return The DescribeConsumerGroupResult.
-     */
-    public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
-        return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
-    }
-
-    /**
-     * List the consumer groups available in the cluster.
-     *
-     * @param options           The options to use when listing the consumer groups.
-     * @return The ListGroupsResult.
-     */
-    public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);
-
-    /**
-     * List the consumer groups available in the cluster with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#listConsumerGroups(ListConsumerGroupsOptions)} with default options.
-     * See the overload for more details.
-     *
-     * @return The ListGroupsResult.
-     */
-    public ListConsumerGroupsResult listConsumerGroups() {
-        return listConsumerGroups(new ListConsumerGroupsOptions());
-    }
-
-    /**
-     * List the consumer group offsets available in the cluster.
-     *
-     * @param options           The options to use when listing the consumer group offsets.
-     * @return The ListGroupOffsetsResult
-     */
-    public abstract ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
-
-    /**
-     * List the consumer group offsets available in the cluster with the default options.
-     *
-     * This is a convenience method for #{@link AdminClient#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
-     *
-     * @return The ListGroupOffsetsResult.
-     */
-    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
-        return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
-    }
-
-    /**
-     * Delete consumer groups from the cluster.
-     *
-     * @param options           The options to use when deleting a consumer group.
-     * @return The DeletConsumerGroupResult.
-     */
-    public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
-
-    /**
-     * Delete consumer groups from the cluster with the default options.
-     *
-     * @return The DeleteConsumerGroupResult.
-     */
-    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
-        return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
-    }
-
-    /**
-     * Elect the preferred replica as leader for topic partitions.
-     *
-     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
-     * with preferred election type and default options.
-     *
-     * This operation is supported by brokers with version 2.2.0 or higher.
-     *
-     * @param partitions      The partitions for which the preferred leader should be elected.
-     * @return                The ElectPreferredLeadersResult.
-     * @deprecated            Since 2.4.0. Use {@link #electLeaders(ElectionType, Set)}.
-     */
-    @Deprecated
-    public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) {
-        return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions());
-    }
-
-    /**
-     * Elect the preferred replica as leader for topic partitions.
-     *
-     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
-     * with preferred election type.
-     *
-     * This operation is supported by brokers with version 2.2.0 or higher.
-     *
-     * @param partitions      The partitions for which the preferred leader should be elected.
-     * @param options         The options to use when electing the preferred leaders.
-     * @return                The ElectPreferredLeadersResult.
-     * @deprecated            Since 2.4.0. Use {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}.
-     */
-    @Deprecated
-    public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
-                                                             ElectPreferredLeadersOptions options) {
-        final ElectLeadersOptions newOptions = new ElectLeadersOptions();
-        newOptions.timeoutMs(options.timeoutMs());
-        final Set<TopicPartition> topicPartitions = partitions == null ? null : new HashSet<>(partitions);
-
-        return new ElectPreferredLeadersResult(electLeaders(ElectionType.PREFERRED, topicPartitions, newOptions));
-    }
-
-    /**
-     * Elect a replica as leader for topic partitions.
-     *
-     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
-     * with default options.
-     *
-     * @param electionType            The type of election to conduct.
-     * @param partitions              The topics and partitions for which to conduct elections.
-     * @return                        The ElectLeadersResult.
-     */
-    public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions) {
-        return electLeaders(electionType, partitions, new ElectLeadersOptions());
-    }
-
-    /**
-     * Elect a replica as leader for the given {@code partitions}, or for all partitions if the argumentl
-     * to {@code partitions} is null.
-     *
-     * This operation is not transactional so it may succeed for some partitions while fail for others.
-     *
-     * It may take several seconds after this method returns success for all the brokers in the cluster
-     * to become aware that the partitions have new leaders. During this time,
-     * {@link AdminClient#describeTopics(Collection)} may not return information about the partitions'
-     * new leaders.
-     *
-     * This operation is supported by brokers with version 2.2.0 or later if preferred eleciton is use;
-     * otherwise the brokers most be 2.4.0 or higher.
-     *
-     * <p>The following exceptions can be anticipated when calling {@code get()} on the future obtained
-     * from the returned {@code ElectLeadersResult}:</p>
-     * <ul>
-     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
-     *   if the authenticated user didn't have alter access to the cluster.</li>
-     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
-     *   if the topic or partition did not exist within the cluster.</li>
-     *   <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
-     *   if the topic was already queued for deletion.</li>
-     *   <li>{@link org.apache.kafka.common.errors.NotControllerException}
-     *   if the request was sent to a broker that was not the controller for the cluster.</li>
-     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
-     *   if the request timed out before the election was complete.</li>
-     *   <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
-     *   if the preferred leader was not alive or not in the ISR.</li>
-     * </ul>
-     *
-     * @param electionType            The type of election to conduct.
-     * @param partitions              The topics and partitions for which to conduct elections.
-     * @param options                 The options to use when electing the leaders.
-     * @return                        The ElectLeadersResult.
-     */
-    public abstract ElectLeadersResult electLeaders(
-            ElectionType electionType,
-            Set<TopicPartition> partitions,
-            ElectLeadersOptions options);
-
-    /**
-     * Get the metrics kept by the adminClient
-     */
-    public abstract Map<MetricName, ? extends Metric> metrics();
 }
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java
index 367c842..950021a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java
@@ -29,7 +29,7 @@ import java.util.stream.Collectors;
 /**
  * A class representing a alter configuration entry containing name, value and operation type.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class AlterConfigOp {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
index 2dbeba2..0b28053 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * Options for {@link AdminClient#alterConfigs(Map)}.
+ * Options for {@link Admin#alterConfigs(Map)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class AlterConfigsOptions extends AbstractOptions<AlterConfigsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
index df6c1c2..29056ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
@@ -24,9 +24,9 @@ import org.apache.kafka.common.config.ConfigResource;
 import java.util.Map;
 
 /**
- * The result of the {@link AdminClient#alterConfigs(Map)} call.
+ * The result of the {@link Admin#alterConfigs(Map)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class AlterConfigsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
index d6892ef..76037fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * Options for {@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)}.
+ * Options for {@link Admin#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)}.
  */
 @InterfaceStability.Evolving
 public class AlterReplicaLogDirsOptions extends AbstractOptions<AlterReplicaLogDirsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
index a3da216..2373265 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 
 /**
- * The result of {@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)}.
+ * The result of {@link Admin#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)}.
  */
 @InterfaceStability.Evolving
 public class AlterReplicaLogDirsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
index c81c0b6..ae7c03a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
@@ -27,7 +27,7 @@ import java.util.Map;
 /**
  * A configuration object containing the configuration entries for a resource.
  * <p>
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class Config {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
index 9976108..7775b6a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -26,7 +26,7 @@ import java.util.Objects;
 /**
  * A class representing a configuration entry containing name, value and additional metadata.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ConfigEntry {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
index a7b92ba..bfb8e32 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for {@link AdminClient#createAcls(Collection)}.
+ * Options for {@link Admin#createAcls(Collection)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class CreateAclsOptions extends AbstractOptions<CreateAclsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
index 2917f17..6e69554 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
@@ -25,9 +25,9 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * The result of the {@link AdminClient#createAcls(Collection)} call.
+ * The result of the {@link Admin#createAcls(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class CreateAclsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
index 1b77b94..6a082d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
@@ -24,9 +24,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 /**
- * Options for {@link AdminClient#createDelegationToken(CreateDelegationTokenOptions)}.
+ * Options for {@link Admin#createDelegationToken(CreateDelegationTokenOptions)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
index 043cbe8..7aa4804 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.security.token.delegation.DelegationToken;
 /**
  * The result of the {@link KafkaAdminClient#createDelegationToken(CreateDelegationTokenOptions)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class CreateDelegationTokenResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
index aafc207..01eb5a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * Options for {@link AdminClient#createPartitions(Map)}.
+ * Options for {@link Admin#createPartitions(Map)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
index c3a504b..8b864b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
@@ -23,9 +23,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * The result of the {@link AdminClient#createPartitions(Map)} call.
+ * The result of the {@link Admin#createPartitions(Map)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class CreatePartitionsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
index 7cd0df8..a9f1009 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for {@link AdminClient#createTopics(Collection)}.
+ * Options for {@link Admin#createTopics(Collection)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
index 404cb918..4fada35 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
@@ -23,9 +23,9 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * The result of {@link AdminClient#createTopics(Collection)}.
+ * The result of {@link Admin#createTopics(Collection)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class CreateTopicsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
index 56f4b78..1b67da5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for the {@link AdminClient#deleteAcls(Collection)} call.
+ * Options for the {@link Admin#deleteAcls(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DeleteAclsOptions extends AbstractOptions<DeleteAclsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
index 63310bc..391b9d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -30,9 +30,9 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * The result of the {@link AdminClient#deleteAcls(Collection)} call.
+ * The result of the {@link Admin#deleteAcls(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DeleteAclsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java
index cd505f4c..081aeab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java
@@ -21,9 +21,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for the {@link AdminClient#deleteConsumerGroups(Collection)} call.
+ * Options for the {@link Admin#deleteConsumerGroups(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsOptions extends AbstractOptions<DeleteConsumerGroupsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index dd6835c..c7d7a5a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -23,9 +23,9 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * The result of the {@link AdminClient#deleteConsumerGroups(Collection)} call.
+ * The result of the {@link Admin#deleteConsumerGroups(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
index 2581694..34af759 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * Options for {@link AdminClient#deleteRecords(Map, DeleteRecordsOptions)}.
+ * Options for {@link Admin#deleteRecords(Map, DeleteRecordsOptions)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DeleteRecordsOptions extends AbstractOptions<DeleteRecordsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
index 8a7d39e..0196632 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
@@ -24,9 +24,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * The result of the {@link AdminClient#deleteRecords(Map)} call.
+ * The result of the {@link Admin#deleteRecords(Map)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DeleteRecordsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
index 27c9af9..91e38a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for {@link AdminClient#deleteTopics(Collection)}.
+ * Options for {@link Admin#deleteTopics(Collection)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
index 9148a76..d48c7e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
@@ -24,9 +24,9 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * The result of the {@link AdminClient#deleteTopics(Collection)} call.
+ * The result of the {@link Admin#deleteTopics(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
index 89ba4bc..b17d6a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
@@ -21,9 +21,9 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link AdminClient#describeAcls(AclBindingFilter)}.
+ * Options for {@link Admin#describeAcls(AclBindingFilter)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeAclsOptions extends AbstractOptions<DescribeAclsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
index e09bf43..fb16222 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
@@ -27,7 +27,7 @@ import java.util.Collection;
 /**
  * The result of the {@link KafkaAdminClient#describeAcls(AclBindingFilter)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeAclsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
index 7fb7bd1..670feda 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -20,9 +20,9 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link AdminClient#describeCluster()}.
+ * Options for {@link Admin#describeCluster()}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
index 21125a2..d307d25 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
@@ -28,7 +28,7 @@ import java.util.Set;
 /**
  * The result of the {@link KafkaAdminClient#describeCluster()} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeClusterResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
index aa667af..83582b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for {@link AdminClient#describeConfigs(Collection)}.
+ * Options for {@link Admin#describeConfigs(Collection)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
index e5d79e8..e18c3b8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ExecutionException;
 /**
  * The result of the {@link KafkaAdminClient#describeConfigs(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeConfigsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
index 8f05f61..70238a8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for {@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}.
+ * Options for {@link Admin#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}.
  * <p>
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeConsumerGroupsOptions extends AbstractOptions<DescribeConsumerGroupsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index 8f0ebad..2eddbba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ExecutionException;
 /**
  * The result of the {@link KafkaAdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeConsumerGroupsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
index 60b9935..ef9f105 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
@@ -23,9 +23,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 /**
- * Options for {@link AdminClient#describeDelegationToken(DescribeDelegationTokenOptions)}.
+ * Options for {@link Admin#describeDelegationToken(DescribeDelegationTokenOptions)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDelegationTokenOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
index 7a9d4b9..47b2530 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.security.token.delegation.DelegationToken;
 /**
  * The result of the {@link KafkaAdminClient#describeDelegationToken(DescribeDelegationTokenOptions)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeDelegationTokenResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
index 5f6352c..17890ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
@@ -23,9 +23,9 @@ import java.util.Collection;
 
 
 /**
- * Options for {@link AdminClient#describeLogDirs(Collection)}
+ * Options for {@link Admin#describeLogDirs(Collection)}
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeLogDirsOptions extends AbstractOptions<DescribeLogDirsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
index 7c7bde7..9bcc2fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
@@ -27,9 +27,9 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
 
 
 /**
- * The result of the {@link AdminClient#describeLogDirs(Collection)} call.
+ * The result of the {@link Admin#describeLogDirs(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeLogDirsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
index c0924ef..589de50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
@@ -21,9 +21,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for {@link AdminClient#describeReplicaLogDirs(Collection)}.
+ * Options for {@link Admin#describeReplicaLogDirs(Collection)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeReplicaLogDirsOptions extends AbstractOptions<DescribeReplicaLogDirsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
index d4c8479..54bd9c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
@@ -28,9 +28,9 @@ import java.util.concurrent.ExecutionException;
 
 
 /**
- * The result of {@link AdminClient#describeReplicaLogDirs(Collection)}.
+ * The result of {@link Admin#describeReplicaLogDirs(Collection)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeReplicaLogDirsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
index 9e7d9da..196341b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for {@link AdminClient#describeTopics(Collection)}.
+ * Options for {@link Admin#describeTopics(Collection)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 6bb24d9..9822b42 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ExecutionException;
 /**
  * The result of the {@link KafkaAdminClient#describeTopics(Collection)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class DescribeTopicsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersOptions.java
index e0a08de..db79976 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersOptions.java
@@ -20,9 +20,9 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
+ * Options for {@link Admin#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 final public class ElectLeadersOptions extends AbstractOptions<ElectLeadersOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
index b4aceba..92da4fc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
@@ -26,9 +26,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 /**
- * The result of {@link AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}
+ * The result of {@link Admin#electLeaders(ElectionType, Set, ElectLeadersOptions)}
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 final public class ElectLeadersResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
index c59aeb3..c00c920 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
@@ -21,11 +21,11 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 
 /**
- * Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}.
+ * Options for {@link Admin#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  *
- * @deprecated Since 2.4.0. Use {@link AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
+ * @deprecated Since 2.4.0. Use {@link Admin#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
  */
 @InterfaceStability.Evolving
 @Deprecated
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
index 5a98d5f..0c85193 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
@@ -29,11 +29,11 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 /**
- * The result of {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}
+ * The result of {@link Admin#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  *
- * @deprecated Since 2.4.0. Use {@link AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
+ * @deprecated Since 2.4.0. Use {@link Admin#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
  */
 @InterfaceStability.Evolving
 @Deprecated
@@ -82,7 +82,7 @@ public class ElectPreferredLeadersResult {
      * an election was attempted even if the election was not successful.</p>
      *
      * <p>This method is provided to discover the partitions attempted when
-     * {@link AdminClient#electPreferredLeaders(Collection)} is called
+     * {@link Admin#electPreferredLeaders(Collection)} is called
      * with a null {@code partitions} argument.</p>
      */
     public KafkaFuture<Set<TopicPartition>> partitions() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
index 138cd4e..3bf9489 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
@@ -20,9 +20,9 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link AdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)}.
+ * Options for {@link Admin#expireDelegationToken(byte[], ExpireDelegationTokenOptions)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ExpireDelegationTokenOptions extends AbstractOptions<ExpireDelegationTokenOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
index 41782bd..59b1714 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 /**
  * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ExpireDelegationTokenResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index eebaf3d..349fc2a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -181,10 +181,10 @@ import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadata
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
 /**
- * The default implementation of {@link AdminClient}. An instance of this class is created by invoking one of the
+ * The default implementation of {@link Admin}. An instance of this class is created by invoking one of the
  * {@code create()} methods in {@code AdminClient}. Users should not refer to this class directly.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class KafkaAdminClient extends AdminClient {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
index c6434eb..af738ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -23,9 +23,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.List;
 
 /**
- * Options for {@link AdminClient#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(String)}.
  * <p>
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
index 23657b5..ea51934 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -25,9 +25,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * The result of the {@link AdminClient#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
  * <p>
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
index 86ca171..eb27c79 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
@@ -20,9 +20,9 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link AdminClient#listConsumerGroups()}.
+ * Options for {@link Admin#listConsumerGroups()}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
index 7de485b..7732ec9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -25,9 +25,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 
 /**
- * The result of the {@link AdminClient#listConsumerGroups()} call.
+ * The result of the {@link Admin#listConsumerGroups()} call.
  * <p>
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
index 1431494..e288e18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -20,9 +20,9 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link AdminClient#listTopics()}.
+ * Options for {@link Admin#listTopics()}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
index a2e17fd..4e7e1a2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -25,9 +25,9 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * The result of the {@link AdminClient#listTopics()} call.
+ * The result of the {@link Admin#listTopics()} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListTopicsResult {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
index 66a4d92..06da256 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Describes new partitions for a particular topic in a call to {@link AdminClient#createPartitions(Map)}.
+ * Describes new partitions for a particular topic in a call to {@link Admin#createPartitions(Map)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class NewPartitions {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
index 66585ea..088c33e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 /**
- * A new topic to be created via {@link AdminClient#createTopics(Collection)}.
+ * A new topic to be created via {@link Admin#createTopics(Collection)}.
  */
 public class NewTopic {
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java b/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
index 53a6dfb..af835c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * Describe records to delete in a call to {@link AdminClient#deleteRecords(Map)}
+ * Describe records to delete in a call to {@link Admin#deleteRecords(Map)}
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class RecordsToDelete {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
index 238dc4a..5c2b0d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
@@ -20,9 +20,9 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link AdminClient#renewDelegationToken(byte[], RenewDelegationTokenOptions)}.
+ * Options for {@link Admin#renewDelegationToken(byte[], RenewDelegationTokenOptions)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class RenewDelegationTokenOptions extends AbstractOptions<RenewDelegationTokenOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
index 38cdf1a..74725d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 /**
  * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class RenewDelegationTokenResult {
diff --git a/clients/src/main/java/org/apache/kafka/common/ElectionType.java b/clients/src/main/java/org/apache/kafka/common/ElectionType.java
index c5b5e50..4e29853 100644
--- a/clients/src/main/java/org/apache/kafka/common/ElectionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/ElectionType.java
@@ -21,9 +21,9 @@ import java.util.Arrays;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
- * Options for {@link org.apache.kafka.clients.admin.AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
+ * Options for {@link org.apache.kafka.clients.admin.Admin#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
  *
- * The API of this class is evolving, see {@link org.apache.kafka.clients.admin.AdminClient} for details.
+ * The API of this class is evolving, see {@link org.apache.kafka.clients.admin.Admin} for details.
  */
 @InterfaceStability.Evolving
 public enum ElectionType {
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index afe5a5d..5dd65ed 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -16,18 +16,17 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.kafka.common.config.ConfigException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
 
 public class ClientUtilsTest {
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 42166b4..f6a808f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -34,7 +34,7 @@ import java.util.Map;
  * easily create a simple cluster.
  * <p>
  * To use in a test, create an instance and prepare its {@link #kafkaClient() MockClient} with the expected responses
- * for the {@link AdminClient}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient
+ * for the {@link Admin}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient
  * and receive the responses you provided.
  *
  * Since {@link #kafkaClient() MockClient} is not thread-safe,
@@ -103,7 +103,7 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
         return cluster;
     }
 
-    public AdminClient adminClient() {
+    public Admin adminClient() {
         return adminClient;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 61606ab..5da9f04 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.security.authenticator;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -118,7 +118,7 @@ public class ClientAuthenticationFailureTest {
     public void testAdminClientWithInvalidCredentials() {
         Map<String, Object> props = new HashMap<>(saslClientConfigs);
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
-        try (AdminClient client = AdminClient.create(props)) {
+        try (Admin client = Admin.create(props)) {
             DescribeTopicsResult result = client.describeTopics(Collections.singleton("test"));
             result.all().get();
             fail("Expected an authentication error!");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index c78026f..fc6181d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.errors;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -76,7 +76,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
                                                          ErrorHandlingMetrics errorHandlingMetrics) {
         String topic = sinkConfig.dlqTopicName();
 
-        try (AdminClient admin = AdminClient.create(adminProps)) {
+        try (Admin admin = Admin.create(adminProps)) {
             if (!admin.listTopics().names().get().contains(topic)) {
                 log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
                 NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 9f30236..86ed42e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.util;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
@@ -41,12 +41,12 @@ public final class ConnectUtils {
 
     public static String lookupKafkaClusterId(WorkerConfig config) {
         log.info("Creating Kafka admin client");
-        try (AdminClient adminClient = AdminClient.create(config.originals())) {
+        try (Admin adminClient = Admin.create(config.originals())) {
             return lookupKafkaClusterId(adminClient);
         }
     }
 
-    static String lookupKafkaClusterId(AdminClient adminClient) {
+    static String lookupKafkaClusterId(Admin adminClient) {
         log.debug("Looking up Kafka cluster ID");
         try {
             KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 72a5981..1fa04de 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.util;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -39,7 +39,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 /**
- * Utility to simplify creating and managing topics via the {@link org.apache.kafka.clients.admin.AdminClient}.
+ * Utility to simplify creating and managing topics via the {@link Admin}.
  */
 public class TopicAdmin implements AutoCloseable {
 
@@ -158,19 +158,19 @@ public class TopicAdmin implements AutoCloseable {
 
     private static final Logger log = LoggerFactory.getLogger(TopicAdmin.class);
     private final Map<String, Object> adminConfig;
-    private final AdminClient admin;
+    private final Admin admin;
 
     /**
      * Create a new topic admin component with the given configuration.
      *
-     * @param adminConfig the configuration for the {@link AdminClient}
+     * @param adminConfig the configuration for the {@link Admin}
      */
     public TopicAdmin(Map<String, Object> adminConfig) {
-        this(adminConfig, AdminClient.create(adminConfig));
+        this(adminConfig, Admin.create(adminConfig));
     }
 
     // visible for testing
-    TopicAdmin(Map<String, Object> adminConfig, AdminClient adminClient) {
+    TopicAdmin(Map<String, Object> adminConfig, Admin adminClient) {
         this.admin = adminClient;
         this.adminConfig = adminConfig != null ? adminConfig : Collections.<String, Object>emptyMap();
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index 4464395..948d54b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -23,7 +23,7 @@ import kafka.utils.CoreUtils;
 import kafka.utils.TestUtils;
 import kafka.zk.EmbeddedZookeeper;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -234,7 +234,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
         newTopic.configs(topicConfig);
 
-        try (final AdminClient adminClient = createAdminClient()) {
+        try (final Admin adminClient = createAdminClient()) {
             adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
         } catch (final InterruptedException | ExecutionException e) {
             throw new RuntimeException(e);
@@ -258,7 +258,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         }
     }
 
-    public AdminClient createAdminClient() {
+    public Admin createAdminClient() {
         final Properties adminClientConfig = new Properties();
         adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
         final Object listeners = brokerConfig.get(KafkaConfig$.MODULE$.ListenersProp());
@@ -267,7 +267,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
             adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
             adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
         }
-        return AdminClient.create(adminClientConfig);
+        return Admin.create(adminClientConfig);
     }
 
     /**
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index f945b25..7ab379d 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -24,7 +24,7 @@ import joptsimple.util.EnumConverter
 import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AdminClient => JAdminClient}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
 import org.apache.kafka.common.security.JaasUtils
@@ -80,7 +80,7 @@ object AclCommand extends Logging {
 
   class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {
 
-    private def withAdminClient(opts: AclCommandOptions)(f: JAdminClient => Unit) {
+    private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit) {
       val props = if (opts.options.has(opts.commandConfigOpt))
         Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
       else
@@ -153,7 +153,7 @@ object AclCommand extends Logging {
       new AccessControlEntry(acl.principal.toString, acl.host, acl.operation.toJava, acl.permissionType.toJava)
     }
 
-    private def removeAcls(adminClient: JAdminClient, acls: Set[Acl], filter: ResourcePatternFilter): Unit = {
+    private def removeAcls(adminClient: Admin, acls: Set[Acl], filter: ResourcePatternFilter): Unit = {
       if (acls.isEmpty)
         adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get()
       else {
@@ -166,7 +166,7 @@ object AclCommand extends Logging {
       new AccessControlEntryFilter(acl.principal.toString, acl.host, acl.operation.toJava, acl.permissionType.toJava)
     }
 
-    private def getAcls(adminClient: JAdminClient, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
+    private def getAcls(adminClient: Admin, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
       val aclBindings =
         if (filters.isEmpty) adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
         else {
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 09c4456..7edc4a4 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.errors.InvalidConfigurationException
@@ -295,7 +295,7 @@ object ConfigCommand extends Config {
     }
   }
 
-  private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
+  private[admin] def alterBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, entityName: String) {
     val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
@@ -324,7 +324,7 @@ object ConfigCommand extends Config {
       println(s"Completed updating default config for brokers in the cluster,")
   }
 
-  private def describeBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
+  private def describeBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, entityName: String) {
     val configs = brokerConfig(adminClient, entityName, includeSynonyms = true)
     if (entityName.nonEmpty)
       println(s"Configs for broker $entityName are:")
@@ -336,7 +336,7 @@ object ConfigCommand extends Config {
     }
   }
 
-  private def brokerConfig(adminClient: JAdminClient, entityName: String, includeSynonyms: Boolean): Seq[ConfigEntry] = {
+  private def brokerConfig(adminClient: Admin, entityName: String, includeSynonyms: Boolean): Seq[ConfigEntry] = {
     val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
     val configSource = if (!entityName.isEmpty)
       ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 0277d9b..3f2ed32 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -528,7 +528,7 @@ object ConsumerGroupCommand extends Logging {
       )
     }
 
-    private def createAdminClient(): admin.AdminClient = {
+    private def createAdminClient(): Admin = {
       val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
       props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
       admin.AdminClient.create(props)
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 4837180..5db38f4 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -24,7 +24,7 @@ import java.util.Base64
 import joptsimple.ArgumentAcceptingOptionSpec
 import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
@@ -72,7 +72,7 @@ object DelegationTokenCommand extends Logging {
     }
   }
 
-  def createToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): DelegationToken = {
+  def createToken(adminClient: Admin, opts: DelegationTokenCommandOptions): DelegationToken = {
     val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]())
     val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
 
@@ -108,7 +108,7 @@ object DelegationTokenCommand extends Logging {
       None
   }
 
-  def renewToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
+  def renewToken(adminClient: Admin, opts: DelegationTokenCommandOptions): Long = {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
     println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
@@ -119,7 +119,7 @@ object DelegationTokenCommand extends Logging {
     expiryTimeStamp
   }
 
-  def expireToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
+  def expireToken(adminClient: Admin, opts: DelegationTokenCommandOptions): Long = {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
     println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
@@ -130,7 +130,7 @@ object DelegationTokenCommand extends Logging {
     expiryTimeStamp
   }
 
-  def describeToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): List[DelegationToken] = {
+  def describeToken(adminClient: Admin, opts: DelegationTokenCommandOptions): List[DelegationToken] = {
     val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
     if (ownerPrincipals.isEmpty)
       println("Calling describe token operation for current user.")
@@ -143,7 +143,7 @@ object DelegationTokenCommand extends Logging {
     tokens
   }
 
-  private def createAdminClient(opts: DelegationTokenCommandOptions): JAdminClient = {
+  private def createAdminClient(opts: DelegationTokenCommandOptions): Admin = {
     val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
     JAdminClient.create(props)
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 600aef6..3a342b7 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -100,7 +100,7 @@ object DeleteRecordsCommand {
     adminClient.close()
   }
 
-  private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.AdminClient = {
+  private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.Admin = {
     val props = if (opts.options.has(opts.commandConfigOpt))
       Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
     else
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
index e484dd3..ff349f6 100644
--- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
@@ -25,8 +25,7 @@ import kafka.utils.CommandLineUtils
 import kafka.utils.CoreUtils
 import kafka.utils.Json
 import kafka.utils.Logging
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.clients.admin.{AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AdminClient => JAdminClient}
 import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ClusterAuthorizationException
@@ -117,7 +116,7 @@ object LeaderElectionCommand extends Logging {
   }
 
   private[this] def electLeaders(
-    client: JAdminClient,
+    client: Admin,
     electionType: ElectionType,
     topicPartitions: Option[Set[TopicPartition]]
   ): Unit = {
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index e689b12..40d39e1 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -21,7 +21,7 @@ import java.io.PrintStream
 import java.util.Properties
 
 import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
-import org.apache.kafka.clients.admin.{AdminClientConfig, DescribeLogDirsResult, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult, AdminClient => JAdminClient}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.utils.Utils
 
@@ -82,7 +82,7 @@ object LogDirsCommand {
         ).asJava)
     }
 
-    private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = {
+    private def createAdminClient(opts: LogDirsCommandOptions): Admin = {
         val props = if (opts.options.has(opts.commandConfigOpt))
             Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
         else
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4d0f071..3726937 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -27,7 +27,7 @@ import kafka.utils._
 import kafka.utils.json.JsonValue
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
-import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -70,7 +70,7 @@ object ReassignPartitionsCommand extends Logging {
     } finally zkClient.close()
   }
 
-  private def createAdminClient(opts: ReassignPartitionsCommandOptions): Option[JAdminClient] = {
+  private def createAdminClient(opts: ReassignPartitionsCommandOptions): Option[Admin] = {
     if (opts.options.has(opts.bootstrapServerOpt)) {
       val props = if (opts.options.has(opts.commandConfigOpt))
         Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
@@ -84,13 +84,13 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
+  def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], opts: ReassignPartitionsCommandOptions) {
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
     verifyAssignment(zkClient, adminClientOpt, jsonString)
   }
 
-  def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], jsonString: String): Unit = {
+  def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], jsonString: String): Unit = {
     println("Status of partition reassignment: ")
     val adminZkClient = new AdminZkClient(zkClient)
     val (partitionsToBeReassigned, replicaAssignment) = parsePartitionReassignmentData(jsonString)
@@ -196,7 +196,7 @@ object ReassignPartitionsCommand extends Logging {
     (partitionsToBeReassigned, currentAssignment)
   }
 
-  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
+  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], opts: ReassignPartitionsCommandOptions) {
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
     val interBrokerThrottle = opts.options.valueOf(opts.interBrokerThrottleOpt)
@@ -205,7 +205,7 @@ object ReassignPartitionsCommand extends Logging {
     executeAssignment(zkClient, adminClientOpt, reassignmentJsonString, Throttle(interBrokerThrottle, replicaAlterLogDirsThrottle), timeoutMs)
   }
 
-  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
+  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
     val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
     val adminZkClient = new AdminZkClient(zkClient)
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)
@@ -371,7 +371,7 @@ object ReassignPartitionsCommand extends Logging {
     }.toMap
   }
 
-  private def checkIfReplicaReassignmentSucceeded(adminClientOpt: Option[JAdminClient], replicaAssignment: Map[TopicPartitionReplica, String])
+  private def checkIfReplicaReassignmentSucceeded(adminClientOpt: Option[Admin], replicaAssignment: Map[TopicPartitionReplica, String])
   :Map[TopicPartitionReplica, ReassignmentStatus] = {
 
     val replicaLogDirInfos = {
@@ -502,7 +502,7 @@ object ReassignPartitionsCommand extends Logging {
 }
 
 class ReassignPartitionsCommand(zkClient: KafkaZkClient,
-                                adminClientOpt: Option[JAdminClient],
+                                adminClientOpt: Option[Admin],
                                 proposedPartitionAssignment: Map[TopicPartition, Seq[Int]],
                                 proposedReplicaAssignment: Map[TopicPartitionReplica, String] = Map.empty,
                                 adminZkClient: AdminZkClient)
@@ -596,7 +596,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
     }.mkString(",")
 
   private def alterReplicaLogDirsIgnoreReplicaNotAvailable(replicaAssignment: Map[TopicPartitionReplica, String],
-                                                           adminClient: JAdminClient,
+                                                           adminClient: Admin,
                                                            timeoutMs: Long): Set[TopicPartitionReplica] = {
     val alterReplicaLogDirsResult = adminClient.alterReplicaLogDirs(replicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt))
     val replicasAssignedToFutureDir = alterReplicaLogDirsResult.values().asScala.flatMap { case (replica, future) => {
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index ae3cde8..f2e197a 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -28,7 +28,7 @@ import kafka.utils.Implicits._
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{ListTopicsOptions, NewPartitions, NewTopic, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, NewPartitions, NewTopic, AdminClient => JAdminClient}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.ConfigResource.Type
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@@ -158,7 +158,7 @@ object TopicCommand extends Logging {
   }
 
   object AdminClientTopicService {
-    def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): JAdminClient = {
+    def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
       bootstrapServer match {
         case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
         case None =>
@@ -170,7 +170,7 @@ object TopicCommand extends Logging {
       new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
   }
 
-  case class AdminClientTopicService private (adminClient: JAdminClient) extends TopicService {
+  case class AdminClientTopicService private (adminClient: Admin) extends TopicService {
 
     override def createTopic(topic: CommandTopicPartition): Unit = {
       if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index e3f969b..2c27526 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -30,7 +30,7 @@ import kafka.api._
 import kafka.utils.Whitelist
 import kafka.utils._
 import org.apache.kafka.clients._
-import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription}
+import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, TopicDescription}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{NetworkReceive, Selectable, Selector}
@@ -211,16 +211,16 @@ object ReplicaVerificationTool extends Logging {
 
   }
 
-  private def listTopicsMetadata(adminClient: admin.AdminClient): Seq[TopicDescription] = {
+  private def listTopicsMetadata(adminClient: Admin): Seq[TopicDescription] = {
     val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get
     adminClient.describeTopics(topics).all.get.values.asScala.toBuffer
   }
 
-  private def brokerDetails(adminClient: admin.AdminClient): Map[Int, Node] = {
+  private def brokerDetails(adminClient: Admin): Map[Int, Node] = {
     adminClient.describeCluster.nodes.get.asScala.map(n => (n.id, n)).toMap
   }
 
-  private def createAdminClient(brokerUrl: String): admin.AdminClient = {
+  private def createAdminClient(brokerUrl: String): Admin = {
     val props = new Properties()
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
     admin.AdminClient.create(props)
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 09f1240..e61b096 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -22,7 +22,7 @@ import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import joptsimple.OptionSpecBuilder;
 import kafka.utils.CommandLineUtils;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
@@ -150,7 +150,7 @@ public class StreamsResetter {
             }
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
 
-            kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
+            kafkaAdminClient = (KafkaAdminClient) Admin.create(properties);
             validateNoActiveConsumers(groupId, kafkaAdminClient);
 
             allTopics.clear();
@@ -179,7 +179,7 @@ public class StreamsResetter {
     }
 
     private void validateNoActiveConsumers(final String groupId,
-                                           final AdminClient adminClient)
+                                           final Admin adminClient)
         throws ExecutionException, InterruptedException {
 
         final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Collections.singleton(groupId),
@@ -647,7 +647,7 @@ public class StreamsResetter {
 
     // visible for testing
     public void doDelete(final List<String> topicsToDelete,
-                          final AdminClient adminClient) {
+                          final Admin adminClient) {
         boolean hasDeleteErrors = false;
         final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
         final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index e61f03f..7f04de1 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -67,7 +67,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   @Rule
   def globalTimeout = Timeout.millis(120000)
 
-  var client: AdminClient = null
+  var client: Admin = null
 
   val topic = "topic"
   val partition = 0
@@ -121,7 +121,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     config
   }
 
-  def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
+  def waitForTopics(client: Admin, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
     TestUtils.waitUntilTrue(() => {
         val topics = client.listTopics.names.get()
         expectedPresent.forall(topicName => topics.contains(topicName)) &&
@@ -1823,7 +1823,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
 object AdminClientIntegrationTest {
 
-  def checkValidAlterConfigs(client: AdminClient, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
+  def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
     // Alter topics
     var topicConfigEntries1 = Seq(
       new ConfigEntry(LogConfig.FlushMsProp, "1000")
@@ -1885,7 +1885,7 @@ object AdminClientIntegrationTest {
     assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
   }
 
-  def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: AdminClient): Unit = {
+  def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: Admin): Unit = {
     // Create topics
     val topic1 = "invalid-alter-configs-topic-1"
     val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 69d5c10..0da0829a 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -21,7 +21,7 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig}
 import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationException}
 import org.apache.kafka.common.utils.Utils
@@ -40,7 +40,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
 
   import AdminClientWithPoliciesIntegrationTest._
 
-  var client: AdminClient = null
+  var client: Admin = null
   val brokerCount = 3
 
   @Rule
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 5b73db0..41bfb0d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -25,7 +25,7 @@ import kafka.network.SocketServer
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigOp}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig, AlterConfigOp}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
@@ -112,7 +112,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
 
   val numRecords = 1
-  val adminClients = Buffer[AdminClient]()
+  val adminClients = Buffer[Admin]()
 
   producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
   producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000")
@@ -1649,7 +1649,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     createProducer()
   }
 
-  private def createAdminClient(): AdminClient = {
+  private def createAdminClient(): Admin = {
     val props = new Properties()
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     val adminClient = AdminClient.create(props)
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 174a3e3..b661eae 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -26,7 +26,7 @@ import kafka.server._
 import kafka.utils.JaasTestUtils.ScramLoginModule
 import kafka.utils.{JaasTestUtils, Logging, TestUtils}
 import kafka.zk.ConfigEntityChangeNotificationZNode
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{Cluster, Reconfigurable}
@@ -54,7 +54,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
   override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  private val adminClients = new ArrayBuffer[AdminClient]()
+  private val adminClients = new ArrayBuffer[Admin]()
   private var producerWithoutQuota: KafkaProducer[Array[Byte], Array[Byte]] = _
 
   val defaultRequestQuota = 1000
@@ -181,7 +181,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
     TestUtils.createTopic(zkClient, topic, assignment, servers)
   }
 
-  private def createAdminClient(): AdminClient = {
+  private def createAdminClient(): Admin = {
     val config = new util.HashMap[String, Object]
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
       TestUtils.bootstrapServers(servers, new ListenerName("BROKER")))
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index c96b172..65df797 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -21,7 +21,7 @@ import java.util
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.zk.ConfigEntityChangeNotificationZNode
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.ScramCredential
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 57fd552..d484f46 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -34,7 +34,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
 
-  var client: AdminClient = _
+  var client: Admin = _
   val group1 = "group1"
   val group2 = "group2"
   val group3 = "group3"
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 92742e4..4efd262 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -27,7 +27,7 @@ import java.util.Properties
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
 import org.junit.{After, Before}
@@ -49,7 +49,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
   private val consumers = mutable.Buffer[KafkaConsumer[_, _]]()
   private val producers = mutable.Buffer[KafkaProducer[_, _]]()
-  private val adminClients = mutable.Buffer[AdminClient]()
+  private val adminClients = mutable.Buffer[Admin]()
 
   protected def interBrokerListenerName: ListenerName = listenerName
 
@@ -138,7 +138,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     consumer
   }
 
-  def createAdminClient(configOverrides: Properties = new Properties): AdminClient = {
+  def createAdminClient(configOverrides: Properties = new Properties): Admin = {
     val props = new Properties
     props ++= adminClientConfig
     props ++= configOverrides
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index bbb3d19..faf2fbc 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -18,7 +18,7 @@ import java.util.Collections
 import java.util.concurrent.{ExecutionException, TimeUnit}
 
 import scala.collection.JavaConverters._
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 9ee83bf..f4c9adf 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -395,7 +395,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     testAclCreateGetDelete(expectAuth = false)
   }
 
-  private def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
+  private def waitForDescribeAcls(client: Admin, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
     var lastResults: util.Collection[AclBinding] = null
     TestUtils.waitUntilTrue(() => {
       lastResults = client.describeAcls(filter).values.get()
diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index 4914a46..21578df 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -25,7 +25,7 @@ import java.util.concurrent._
 
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.{CoreUtils, TestUtils}
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -185,7 +185,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     }
   }
 
-  private def createAdminClient(): AdminClient = {
+  private def createAdminClient(): Admin = {
     val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(securityProtocol.name))
     val config = new Properties()
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 011565e..8b720fa 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -81,7 +81,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   private val numPartitions = 10
   private val producers = new ArrayBuffer[KafkaProducer[String, String]]
   private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
-  private val adminClients = new ArrayBuffer[AdminClient]()
+  private val adminClients = new ArrayBuffer[Admin]()
   private val clientThreads = new ArrayBuffer[ShutdownableThread]()
   private val executors = new ArrayBuffer[ExecutorService]
   private val topic = "testtopic"
@@ -1070,7 +1070,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     securityProps(props, props.keySet)
   }
 
-  private def createAdminClient(securityProtocol: SecurityProtocol, listenerName: String): AdminClient = {
+  private def createAdminClient(securityProtocol: SecurityProtocol, listenerName: String): Admin = {
     val config = clientProps(securityProtocol)
     val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
@@ -1109,7 +1109,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }, "Did not fail authentication with invalid config")
   }
 
-  private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = {
+  private def describeConfig(adminClient: Admin, servers: Seq[KafkaServer] = this.servers): Config = {
     val configResources = servers.map { server =>
       new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
     }
@@ -1157,7 +1157,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     newStoreProps
   }
 
-  private def alterSslKeystore(adminClient: AdminClient, props: Properties, listener: String, expectFailure: Boolean  = false): Unit = {
+  private def alterSslKeystore(adminClient: Admin, props: Properties, listener: String, expectFailure: Boolean  = false): Unit = {
     val configPrefix = listenerPrefix(listener)
     val newProps = securityProps(props, KEYSTORE_PROPS, configPrefix)
     reconfigureServers(newProps, perBrokerConfig = true,
@@ -1189,14 +1189,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
   }
 
-  private def serverEndpoints(adminClient: AdminClient): String = {
+  private def serverEndpoints(adminClient: Admin): String = {
     val nodes = adminClient.describeCluster().nodes().get
     nodes.asScala.map { node =>
       s"${node.host}:${node.port}"
     }.mkString(",")
   }
 
-  private def alterAdvertisedListener(adminClient: AdminClient, externalAdminClient: AdminClient, oldHost: String, newHost: String): Unit = {
+  private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
     val configs = servers.map { server =>
       val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
       val newListeners = server.config.advertisedListeners.map { e =>
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
index 98df2f1..cae0902 100644
--- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -22,7 +22,7 @@ import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -38,7 +38,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
   private val kafkaServerSaslMechanisms = List("PLAIN")
   protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  var adminClient: org.apache.kafka.clients.admin.AdminClient = null
+  var adminClient: Admin = null
 
   override def brokerCount = 1
 
@@ -68,7 +68,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
 
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = org.apache.kafka.clients.admin.AdminClient.create(createAdminConfig)
+    adminClient = AdminClient.create(createAdminConfig)
     val renewer1 = "User:renewer1"
     val renewer2 = "User:renewer2"
 
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index 8b4d84f..13ad590 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -20,13 +20,13 @@ import java.io.File
 import java.nio.charset.StandardCharsets
 import java.nio.file.Files
 import java.nio.file.Path
+
 import kafka.common.AdminCommandFailedException
 import kafka.server.KafkaConfig
 import kafka.server.KafkaServer
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.clients.admin.{AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
@@ -35,6 +35,7 @@ import org.junit.After
 import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
+
 import scala.collection.JavaConverters._
 import scala.collection.Seq
 import scala.concurrent.duration._
@@ -70,7 +71,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
   @Test
   def testAllTopicPartition(): Unit = {
-    TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client =>
+    TestUtils.resource(AdminClient.create(createConfig(servers).asJava)) { client =>
       val topic = "unclean-topic"
       val partition = 0
       val assignment = Seq(broker2, broker3)
@@ -101,7 +102,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicPartition(): Unit = {
-    TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client =>
+    TestUtils.resource(AdminClient.create(createConfig(servers).asJava)) { client =>
       val topic = "unclean-topic"
       val partition = 0
       val assignment = Seq(broker2, broker3)
@@ -133,7 +134,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
   @Test
   def testPathToJsonFile(): Unit = {
-    TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client =>
+    TestUtils.resource(AdminClient.create(createConfig(servers).asJava)) { client =>
       val topic = "unclean-topic"
       val partition = 0
       val assignment = Seq(broker2, broker3)
@@ -166,7 +167,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
   @Test
   def testPreferredReplicaElection(): Unit = {
-    TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client =>
+    TestUtils.resource(AdminClient.create(createConfig(servers).asJava)) { client =>
       val topic = "unclean-topic"
       val partition = 0
       val assignment = Seq(broker2, broker3)
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 2b36da8..8bb64bc 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -24,8 +24,7 @@ import kafka.zk.{ReassignPartitionsZNode, ZkVersion, ZooKeeperTestHarness}
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{After, Before, Test}
 import kafka.admin.ReplicationQuotaUtils._
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.clients.admin.{AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 
 import scala.collection.JavaConverters._
@@ -41,7 +40,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   var servers: Seq[KafkaServer] = null
   val topicName = "my-topic"
   val delayMs = 1000
-  var adminClient: JAdminClient = null
+  var adminClient: Admin = null
 
   def zkUpdateDelay(): Unit = Thread.sleep(delayMs)
 
@@ -59,11 +58,11 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     }.map(c => createServer(KafkaConfig.fromProps(c)))
   }
 
-  def createAdminClient(servers: Seq[KafkaServer]): JAdminClient = {
+  def createAdminClient(servers: Seq[KafkaServer]): Admin = {
     val props = new Properties()
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
     props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
-    JAdminClient.create(props)
+    AdminClient.create(props)
   }
 
   def getRandomLogDirAssignment(brokerId: Int): String = {
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index a98f751..8813b59 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -25,7 +25,7 @@ import kafka.server.{ConfigType, KafkaConfig}
 import kafka.utils.{Exit, Logging, TestUtils}
 import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{ListTopicsOptions, NewTopic, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, ListTopicsOptions, NewTopic}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.ListenerName
@@ -59,7 +59,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
   private val defaultReplicationFactor = 1.toShort
 
   private var topicService: AdminClientTopicService = _
-  private var adminClient: JAdminClient = _
+  private var adminClient: Admin = _
   private var testTopicName: String = _
 
   private val _testName = new TestName
@@ -105,7 +105,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     // create adminClient
     val props = new Properties()
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    adminClient = JAdminClient.create(props)
+    adminClient = AdminClient.create(props)
     topicService = AdminClientTopicService(adminClient)
     testTopicName = s"${testName.getMethodName}-${Random.alphanumeric.take(10).mkString}"
   }
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index daa0982..64cafb4 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.junit.Assert._
 import org.scalatest.Assertions.intercept
 
@@ -347,7 +347,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
   }
 
-  private def createAdminClient(): AdminClient = {
+  private def createAdminClient(): Admin = {
     val config = new Properties
     val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT"))
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 9c50b15..19ca1dd 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import java.util
 
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
 import org.junit.{After, Before, Test}
 import org.scalatest.Assertions.intercept
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
-  var adminClient: AdminClient = null
+  var adminClient: Admin = null
 
   override def brokerCount = 1
 
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index aec3cb6..52f7885 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -20,7 +20,7 @@ import java.util
 
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
 import org.apache.kafka.common.errors.InvalidPrincipalTypeException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.SecurityUtils
@@ -37,7 +37,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
   private val kafkaServerSaslMechanisms = List("PLAIN")
   protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  var adminClient: AdminClient = null
+  var adminClient: Admin = null
 
   override def brokerCount = 1
 
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index 04fd536..01c793c 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -20,7 +20,7 @@ import java.util
 
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
 import org.apache.kafka.common.errors.DelegationTokenDisabledException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
@@ -35,7 +35,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
   private val kafkaServerSaslMechanisms = List("PLAIN")
   protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  var adminClient: AdminClient = null
+  var adminClient: Admin = null
 
   override def brokerCount = 1
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 24c7e19..ffaf47b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1426,7 +1426,7 @@ object TestUtils extends Logging {
     }
   }
 
-  def alterConfigs(servers: Seq[KafkaServer], adminClient: AdminClient, props: Properties,
+  def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
                    perBrokerConfig: Boolean): AlterConfigsResult = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
@@ -1441,8 +1441,8 @@ object TestUtils extends Logging {
     adminClient.alterConfigs(configs)
   }
 
-  def incrementalAlterConfigs(servers: Seq[KafkaServer], adminClient: AdminClient, props: Properties,
-                   perBrokerConfig: Boolean, opType: OpType = OpType.SET): AlterConfigsResult  = {
+  def incrementalAlterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
+                              perBrokerConfig: Boolean, opType: OpType = OpType.SET): AlterConfigsResult  = {
     val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new ConfigEntry(k, v), opType) }.toList.asJavaCollection
     val configs = if (perBrokerConfig) {
       servers.map { server =>
@@ -1455,14 +1455,14 @@ object TestUtils extends Logging {
     adminClient.incrementalAlterConfigs(configs)
   }
 
-  def alterTopicConfigs(adminClient: AdminClient, topic: String, topicConfigs: Properties): AlterConfigsResult = {
+  def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = {
     val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
     val configs = Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> newConfig).asJava
     adminClient.alterConfigs(configs)
   }
 
-  def currentLeader(client: AdminClient, topicPartition: TopicPartition): Option[Int] = {
+  def currentLeader(client: Admin, topicPartition: TopicPartition): Option[Int] = {
     Option(
       client
         .describeTopics(Arrays.asList(topicPartition.topic))
@@ -1475,14 +1475,14 @@ object TestUtils extends Logging {
     ).map(_.id)
   }
 
-  def waitForLeaderToBecome(client: AdminClient, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
     TestUtils.waitUntilTrue(
       () => currentLeader(client, topicPartition) == leader,
       s"Expected leader to become $leader", 10000
     )
   }
 
-  def waitForBrokersOutOfIsr(client: AdminClient, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = {
+  def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = {
     TestUtils.waitUntilTrue(
       () => {
         val description = client.describeTopics(partition.map(_.topic).asJava).all.get.asScala
@@ -1498,7 +1498,7 @@ object TestUtils extends Logging {
     )
   }
 
-  def waitForBrokersInIsr(client: AdminClient, partition: TopicPartition, brokerIds: Set[Int]): Unit = {
+  def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: Set[Int]): Unit = {
     TestUtils.waitUntilTrue(
       () => {
         val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala
diff --git a/docs/api.html b/docs/api.html
index 4545e9b..b6ab1fa 100644
--- a/docs/api.html
+++ b/docs/api.html
@@ -21,7 +21,7 @@
 	<li>The <a href="#consumerapi">Consumer</a> API allows applications to read streams of data from topics in the Kafka cluster.
 	<li>The <a href="#streamsapi">Streams</a> API allows transforming streams of data from input topics to output topics.
 	<li>The <a href="#connectapi">Connect</a> API allows implementing connectors that continually pull from some source system or application into Kafka or push from Kafka into some sink system or application.
-	<li>The <a href="#adminapi">AdminClient</a> API allows managing and inspecting topics, brokers, and other Kafka objects.
+	<li>The <a href="#adminapi">Admin</a> API allows managing and inspecting topics, brokers, and other Kafka objects.
 	</ol>
 
 	Kafka exposes all its functionality over a language independent protocol which has clients available in many programming languages. However only the Java clients are maintained as part of the main Kafka project, the others are available as independent open source projects. A list of non-Java clients is available <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">here</a>.
@@ -100,11 +100,11 @@
 	Those who want to implement custom connectors can see the <a href="/{{version}}/javadoc/index.html?org/apache/kafka/connect" title="Kafka {{dotVersion}} Javadoc">javadoc</a>.
 	<p>
 
-	<h3><a id="adminapi" href="#adminapi">2.5 AdminClient API</a></h3>
+	<h3><a id="adminapi" href="#adminapi">2.5 Admin API</a></h3>
 
-	The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
+	The Admin API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
 	<p>
-	To use the AdminClient API, add the following Maven dependency:
+	To use the Admin API, add the following Maven dependency:
 	<pre class="brush: xml;">
 		&lt;dependency&gt;
 			&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
@@ -112,7 +112,7 @@
 			&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
 		&lt;/dependency&gt;
 	</pre>
-	For more information about the AdminClient APIs, see the <a href="/{{version}}/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html" title="Kafka {{dotVersion}} Javadoc">javadoc</a>.
+	For more information about the Admin APIs, see the <a href="/{{version}}/javadoc/index.html?org/apache/kafka/clients/admin/Admin.html" title="Kafka {{dotVersion}} Javadoc">javadoc</a>.
 	<p>
 
 </script>
diff --git a/docs/configuration.html b/docs/configuration.html
index 112c844..dc17333 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -285,7 +285,7 @@
   Below is the configuration of the Kafka Streams client library.
   <!--#include virtual="generated/streams_config.html" -->
 
-  <h3><a id="adminclientconfigs" href="#adminclientconfigs">3.7 AdminClient Configs</a></h3>
+  <h3><a id="adminclientconfigs" href="#adminclientconfigs">3.7 Admin Configs</a></h3>
   Below is the configuration of the Kafka Admin client library.
   <!--#include virtual="generated/admin_client_config.html" -->
 </script>
diff --git a/docs/security.html b/docs/security.html
index e5e5532..f281bff 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -920,7 +920,7 @@
         <p>Typical steps for delegation token usage are:</p>
         <ol>
         <li>User authenticates with the Kafka cluster via SASL or SSL, and obtains a delegation token. This can be done
-            using AdminClient APIs or using <tt>kafka-delegation-tokens.sh</tt> script.</li>
+            using Admin APIs or using <tt>kafka-delegation-tokens.sh</tt> script.</li>
         <li>User securely passes the delegation token to Kafka clients for authenticating with the Kafka cluster.</li>
         <li>Token owner/renewer can renew/expire the delegation tokens.</li>
         </ol>
@@ -944,7 +944,7 @@
         </li>
 
         <li><h5><a id="security_sasl_create_tokens" href="#security_sasl_create_tokens">Creating Delegation Tokens</a></h5>
-        <p>Tokens can be created by using AdminClient APIs or using <tt>kafka-delegation-tokens.sh</tt> script.
+        <p>Tokens can be created by using Admin APIs or using <tt>kafka-delegation-tokens.sh</tt> script.
             Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels.
             Tokens can not be requests if the initial authentication is done through delegation token.
             <tt>kafka-delegation-tokens.sh</tt> script examples are given below.</p>
@@ -1281,8 +1281,8 @@
                 Note that for consumer option we must also specify the consumer group.
                 In order to remove a principal from producer or consumer role we just need to pass --remove option. </li>
 
-        <li><b>AdminClient API based acl management</b><br>
-            Users having Alter permission on ClusterResource can use AdminClient API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly.
+        <li><b>Admin API based acl management</b><br>
+            Users having Alter permission on ClusterResource can use Admin API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly.
             All the above examples can be executed by using <b>--bootstrap-server</b> option. For example:
 
             <pre class="brush: bash;">
diff --git a/docs/toc.html b/docs/toc.html
index a818921..1223d20 100644
--- a/docs/toc.html
+++ b/docs/toc.html
@@ -35,7 +35,7 @@
                 <li><a href="#consumerapi">2.2 Consumer API</a>
                 <li><a href="/{{version}}/documentation/streams">2.3 Streams API</a>
                 <li><a href="#connectapi">2.4 Connect API</a>
-                <li><a href="#adminapi">2.5 AdminClient API</a>
+                <li><a href="#adminapi">2.5 Admin API</a>
             </ul>
         </li>
         <li><a href="#configuration">3. Configuration</a>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 4ed2770..cc9d27e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -31,12 +31,12 @@ import java.util.Map;
  */
 public interface KafkaClientSupplier {
     /**
-     * Create an {@link AdminClient} which is used for internal topic management.
+     * Create an {@link Admin} which is used for internal topic management.
      *
      * @param config Supplied by the {@link java.util.Properties} given to the {@link KafkaStreams}
-     * @return an instance of {@link AdminClient}
+     * @return an instance of {@link Admin}
      */
-    AdminClient getAdminClient(final Map<String, Object> config);
+    Admin getAdminClient(final Map<String, Object> config);
 
     /**
      * Create a {@link Producer} which is used to write records to sink topics.
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 02586f9..bbf9f9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -141,7 +141,7 @@ public class KafkaStreams implements AutoCloseable {
     private final StreamsMetadataState streamsMetadataState;
     private final ScheduledExecutorService stateDirCleaner;
     private final QueryableStoreProvider queryableStoreProvider;
-    private final AdminClient adminClient;
+    private final Admin adminClient;
 
     private GlobalStreamThread globalStreamThread;
     private KafkaStreams.StateListener stateListener;
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6d93b99..f08eeca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -58,7 +58,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
 
 /**
  * Configuration for a {@link KafkaStreams} instance.
- * Can also be used to configure the Kafka Streams internal {@link KafkaConsumer}, {@link KafkaProducer} and {@link AdminClient}.
+ * Can also be used to configure the Kafka Streams internal {@link KafkaConsumer}, {@link KafkaProducer} and {@link Admin}.
  * To avoid consumer/producer/admin property conflicts, you should prefix those properties using
  * {@link #consumerPrefix(String)}, {@link #producerPrefix(String)} and {@link #adminClientPrefix(String)}, respectively.
  * <p>
@@ -198,7 +198,7 @@ public class StreamsConfig extends AbstractConfig {
     public static final String PRODUCER_PREFIX = "producer.";
 
     /**
-     * Prefix used to isolate {@link org.apache.kafka.clients.admin.AdminClient admin} configs from other client configs.
+     * Prefix used to isolate {@link Admin admin} configs from other client configs.
      * It is recommended to use {@link #adminClientPrefix(String)} to add this prefix to {@link ProducerConfig producer
      * properties}.
      */
@@ -1116,7 +1116,7 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
-     * Get the configs for the {@link org.apache.kafka.clients.admin.AdminClient admin client}.
+     * Get the configs for the {@link Admin admin client}.
      * @param clientId clientId
      * @return Map of the admin client configuration.
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index 69331b4..f56f834 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import java.util.Map;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -29,9 +29,9 @@ import org.apache.kafka.streams.KafkaClientSupplier;
 
 public class DefaultKafkaClientSupplier implements KafkaClientSupplier {
     @Override
-    public AdminClient getAdminClient(final Map<String, Object> config) {
+    public Admin getAdminClient(final Map<String, Object> config) {
         // create a new client upon each call; but expect this call to be only triggered once so this should be fine
-        return AdminClient.create(config);
+        return Admin.create(config);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 3cb06f6..320ce11 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
@@ -53,12 +53,12 @@ public class InternalTopicManager {
     private final Map<String, String> defaultTopicConfigs = new HashMap<>();
 
     private final short replicationFactor;
-    private final AdminClient adminClient;
+    private final Admin adminClient;
 
     private final int retries;
     private final long retryBackOffMs;
 
-    public InternalTopicManager(final AdminClient adminClient,
+    public InternalTopicManager(final Admin adminClient,
                                 final StreamsConfig streamsConfig) {
         this.adminClient = adminClient;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ea1e29d..1633b30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -566,7 +566,7 @@ public class StreamThread extends Thread {
     public static StreamThread create(final InternalTopologyBuilder builder,
                                       final StreamsConfig config,
                                       final KafkaClientSupplier clientSupplier,
-                                      final AdminClient adminClient,
+                                      final Admin adminClient,
                                       final UUID processId,
                                       final String clientId,
                                       final Metrics metrics,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c136fdb..6b3ab001 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DeleteRecordsResult;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -57,7 +57,7 @@ public class TaskManager {
     private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
     private final StreamsMetadataState streamsMetadataState;
 
-    final AdminClient adminClient;
+    final Admin adminClient;
     private DeleteRecordsResult deleteRecordsResult;
 
     // following information is updated during rebalance phase by the partition assignor
@@ -74,7 +74,7 @@ public class TaskManager {
                 final StreamsMetadataState streamsMetadataState,
                 final StreamThread.AbstractTaskCreator<StreamTask> taskCreator,
                 final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator,
-                final AdminClient adminClient,
+                final Admin adminClient,
                 final AssignedStreamsTasks active,
                 final AssignedStandbyTasks standby) {
         this.changelogReader = changelogReader;
@@ -285,7 +285,7 @@ public class TaskManager {
         }
     }
 
-    AdminClient getAdminClient() {
+    Admin getAdminClient() {
         return adminClient;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index d8c0570..675286b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import kafka.tools.StreamsResetter;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -75,7 +75,7 @@ public abstract class AbstractResetIntegrationTest {
 
     private static MockTime mockTime;
     private static KafkaStreams streams;
-    private static AdminClient adminClient = null;
+    private static Admin adminClient = null;
 
     abstract Map<String, Object> getClientSslConfig();
 
@@ -95,7 +95,7 @@ public abstract class AbstractResetIntegrationTest {
 
     private void prepareEnvironment() {
         if (adminClient == null) {
-            adminClient = AdminClient.create(commonClientConfig);
+            adminClient = Admin.create(commonClientConfig);
         }
 
         boolean timeSet = false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 0ee0278..cdb369c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import kafka.log.LogConfig;
 import kafka.utils.MockTime;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -113,7 +113,7 @@ public class InternalTopicIntegrationTest {
     }
 
     private Properties getTopicProperties(final String changelog) {
-        try (final AdminClient adminClient = createAdminClient()) {
+        try (final Admin adminClient = createAdminClient()) {
             final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, changelog);
             try {
                 final Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get();
@@ -130,10 +130,10 @@ public class InternalTopicIntegrationTest {
         }
     }
 
-    private AdminClient createAdminClient() {
+    private Admin createAdminClient() {
         final Properties adminClientConfig = new Properties();
         adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        return AdminClient.create(adminClientConfig);
+        return Admin.create(adminClientConfig);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 0cfa97a..efae747 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
@@ -60,7 +60,7 @@ public class PurgeRepartitionTopicIntegrationTest {
     private static final String APPLICATION_ID = "restore-test";
     private static final String REPARTITION_TOPIC = APPLICATION_ID + "-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition";
 
-    private static AdminClient adminClient;
+    private static Admin adminClient;
     private static KafkaStreams kafkaStreams;
     private static final Integer PURGE_INTERVAL_MS = 10;
     private static final Integer PURGE_SEGMENT_BYTES = 2000;
@@ -148,7 +148,7 @@ public class PurgeRepartitionTopicIntegrationTest {
         // create admin client for verification
         final Properties adminConfig = new Properties();
         adminConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        adminClient = AdminClient.create(adminConfig);
+        adminClient = Admin.create(adminConfig);
 
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index eeee3bf..43438a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -22,7 +22,7 @@ import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.config.SslConfigs;
@@ -180,7 +180,7 @@ public class KafkaEmbedded {
         final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
         newTopic.configs(topicConfig);
 
-        try (final AdminClient adminClient = createAdminClient()) {
+        try (final Admin adminClient = createAdminClient()) {
             adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
         } catch (final InterruptedException | ExecutionException e) {
             throw new RuntimeException(e);
@@ -188,7 +188,7 @@ public class KafkaEmbedded {
     }
 
     @SuppressWarnings("WeakerAccess")
-    public AdminClient createAdminClient() {
+    public Admin createAdminClient() {
         final Properties adminClientConfig = new Properties();
         adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
         final Object listeners = effectiveConfig.get(KafkaConfig$.MODULE$.ListenersProp());
@@ -197,13 +197,13 @@ public class KafkaEmbedded {
             adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
             adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
         }
-        return AdminClient.create(adminClientConfig);
+        return Admin.create(adminClientConfig);
     }
 
     @SuppressWarnings("WeakerAccess")
     public void deleteTopic(final String topic) {
         log.debug("Deleting topic { name: {} }", topic);
-        try (final AdminClient adminClient = createAdminClient()) {
+        try (final Admin adminClient = createAdminClient()) {
             adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
         } catch (final InterruptedException | ExecutionException e) {
             if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7d7d4e6..77e0254 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DeleteRecordsResult;
 import org.apache.kafka.clients.admin.DeletedRecords;
 import org.apache.kafka.clients.admin.RecordsToDelete;
@@ -87,7 +87,7 @@ public class TaskManagerTest {
     @Mock(type = MockType.NICE)
     private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
     @Mock(type = MockType.NICE)
-    private AdminClient adminClient;
+    private Admin adminClient;
     @Mock(type = MockType.NICE)
     private StreamTask streamTask;
     @Mock(type = MockType.NICE)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 3187359..5daf066 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -16,9 +16,8 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -146,7 +145,7 @@ public class EosTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
 
         final Map<TopicPartition, Long> committedOffsets;
-        try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
+        try (final Admin adminClient = Admin.create(props)) {
             ensureStreamsApplicationDown(adminClient);
 
             committedOffsets = getCommittedOffsets(adminClient, withRepartitioning);
@@ -218,7 +217,7 @@ public class EosTestDriver extends SmokeTestUtil {
         System.out.flush();
     }
 
-    private static void ensureStreamsApplicationDown(final AdminClient adminClient) {
+    private static void ensureStreamsApplicationDown(final Admin adminClient) {
 
         final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         ConsumerGroupDescription description;
@@ -236,7 +235,7 @@ public class EosTestDriver extends SmokeTestUtil {
     }
 
 
-    private static Map<TopicPartition, Long> getCommittedOffsets(final AdminClient adminClient,
+    private static Map<TopicPartition, Long> getCommittedOffsets(final Admin adminClient,
                                                                  final boolean withRepartitioning) {
         final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap;
 
@@ -617,7 +616,7 @@ public class EosTestDriver extends SmokeTestUtil {
     }
 
 
-    private static ConsumerGroupDescription getConsumerGroupDescription(final AdminClient adminClient) {
+    private static ConsumerGroupDescription getConsumerGroupDescription(final Admin adminClient) {
         final ConsumerGroupDescription description;
         try {
             description = adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID))
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index d3430f2..4330d6c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -57,7 +57,7 @@ public class MockClientSupplier implements KafkaClientSupplier {
     }
 
     @Override
-    public AdminClient getAdminClient(final Map<String, Object> config) {
+    public Admin getAdminClient(final Map<String, Object> config) {
         return new MockAdminClient(cluster.nodes(), cluster.nodeById(0));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index 8ab50b8..1eebf24 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.streams.StreamsConfig;
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.Set;
 
 
-
 public class MockInternalTopicManager extends InternalTopicManager {
 
     final public Map<String, Integer> readyTopics = new HashMap<>();
@@ -38,7 +37,7 @@ public class MockInternalTopicManager extends InternalTopicManager {
 
     public MockInternalTopicManager(final StreamsConfig streamsConfig,
                                     final MockConsumer<byte[], byte[]> restoreConsumer) {
-        super(KafkaAdminClient.create(streamsConfig.originals()), streamsConfig);
+        super(Admin.create(streamsConfig.originals()), streamsConfig);
 
         this.restoreConsumer = restoreConsumer;
     }
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 887bdc4..89e6b44 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -20,7 +20,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicListing;
@@ -247,7 +247,7 @@ public class ClientCompatibilityTest {
     void testAdminClient() throws Throwable {
         Properties adminProps = new Properties();
         adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
-        try (final AdminClient client = AdminClient.create(adminProps)) {
+        try (final Admin client = Admin.create(adminProps)) {
             while (true) {
                 Collection<Node> nodes = client.describeCluster().nodes().get();
                 if (nodes.size() == testConfig.numClusterNodes) {
@@ -297,7 +297,7 @@ public class ClientCompatibilityTest {
         }
     }
 
-    private void createTopicsResultTest(AdminClient client, Collection<String> topics)
+    private void createTopicsResultTest(Admin client, Collection<String> topics)
             throws InterruptedException, ExecutionException {
         while (true) {
             try {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index cb765cc..faf2d96 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.trogdor.common;
 
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.DescribeTopicsOptions;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
@@ -131,7 +131,7 @@ public final class WorkerUtils {
         // this method wraps the call to createTopics() that takes admin client, so that we can
         // unit test the functionality with MockAdminClient. The exception is caught and
         // re-thrown so that admin client is closed when the method returns.
-        try (AdminClient adminClient
+        try (Admin adminClient
                  = createAdminClient(bootstrapServers, commonClientConf, adminClientConf)) {
             createTopics(log, adminClient, topics, failOnExisting);
         } catch (Exception e) {
@@ -148,7 +148,7 @@ public final class WorkerUtils {
      * @throws Throwable if creation of one or more topics fails (except for the cases above).
      */
     static void createTopics(
-        Logger log, AdminClient adminClient,
+        Logger log, Admin adminClient,
         Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
         if (topics.isEmpty()) {
             log.warn("Request to create topics has an empty topic list.");
@@ -174,7 +174,7 @@ public final class WorkerUtils {
      * @return                Collection of topics names that already exist.
      * @throws Throwable if creation of one or more topics fails (except for topic exists case).
      */
-    private static Collection<String> createTopics(Logger log, AdminClient adminClient,
+    private static Collection<String> createTopics(Logger log, Admin adminClient,
                                                    Collection<NewTopic> topics) throws Throwable {
         long startMs = Time.SYSTEM.milliseconds();
         int tries = 0;
@@ -249,7 +249,7 @@ public final class WorkerUtils {
      * described in 'topicsInfo'
      */
     static void verifyTopics(
-        Logger log, AdminClient adminClient,
+        Logger log, Admin adminClient,
         Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, int retryCount, long retryBackoffMs) throws Throwable {
 
         Map<String, TopicDescription> topicDescriptionMap = topicDescriptions(topicsToVerify, adminClient,
@@ -270,7 +270,7 @@ public final class WorkerUtils {
     }
 
     private static Map<String, TopicDescription> topicDescriptions(Collection<String> topicsToVerify,
-                                                                   AdminClient adminClient,
+                                                                   Admin adminClient,
                                                                    int retryCount, long retryBackoffMs)
             throws ExecutionException, InterruptedException {
         UnknownTopicOrPartitionException lastException = null;
@@ -300,7 +300,7 @@ public final class WorkerUtils {
      * @throws Throwable      If failed to get list of existing topics
      */
     static Collection<TopicPartition> getMatchingTopicPartitions(
-        AdminClient adminClient, String topicRegex, int startPartition, int endPartition)
+        Admin adminClient, String topicRegex, int startPartition, int endPartition)
         throws Throwable {
         final Pattern topicNamePattern = Pattern.compile(topicRegex);
 
@@ -332,7 +332,7 @@ public final class WorkerUtils {
         return out;
     }
 
-    private static AdminClient createAdminClient(
+    private static Admin createAdminClient(
         String bootstrapServers,
         Map<String, String> commonClientConf, Map<String, String> adminClientConf) {
         Properties props = new Properties();
@@ -341,6 +341,6 @@ public final class WorkerUtils {
         // first add common client config, and then admin client config to properties, possibly
         // over-writing default or common properties.
         addConfigsToProperties(props, commonClientConf, adminClientConf);
-        return AdminClient.create(props);
+        return Admin.create(props);
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 5c5db48..cef8d2d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -26,7 +26,7 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.ManualMetadataUpdater;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NetworkClientUtils;
-import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
@@ -207,7 +207,7 @@ public class ConnectionStressWorker implements TaskWorker {
 
         @Override
         public boolean tryConnect() {
-            try (AdminClient client = AdminClient.create(this.props)) {
+            try (Admin client = Admin.create(this.props)) {
                 client.describeCluster().nodes().get();
             } catch (RuntimeException e) {
                 return false;


Mime
View raw message