kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7689; Add AlterConsumerGroup/List Offsets to AdminClient [KIP-396] (#7296)
Date Sun, 20 Oct 2019 04:31:17 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 99a4068  KAFKA-7689; Add AlterConsumerGroup/List Offsets to AdminClient [KIP-396] (#7296)
99a4068 is described below

commit 99a4068c5ca61951d70b9e647ead3b08a2af4309
Author: Mickael Maison <mimaison@users.noreply.github.com>
AuthorDate: Sun Oct 20 05:30:50 2019 +0100

    KAFKA-7689; Add AlterConsumerGroup/List Offsets to AdminClient [KIP-396] (#7296)
    
    This patch implements new AdminClient APIs to list offsets and alter consumer group offsets as documented in KIP-396: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  51 ++
 .../admin/AlterConsumerGroupOffsetsOptions.java}   |  32 +-
 .../admin/AlterConsumerGroupOffsetsResult.java     |  96 ++++
 .../kafka/clients/admin/KafkaAdminClient.java      | 508 ++++++++++++++------
 .../admin/ListOffsetsOptions.java}                 |  35 +-
 .../kafka/clients/admin/ListOffsetsResult.java     | 107 +++++
 .../org/apache/kafka/clients/admin/OffsetSpec.java |  62 +++
 .../internals/ConsumerGroupOperationContext.java   |  87 ++++
 .../admin/internals/MetadataOperationContext.java  |  96 ++++
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   2 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |   2 +-
 .../consumer/internals/SubscriptionState.java      |   2 +-
 .../common/{requests => }/IsolationLevel.java      |   2 +-
 .../kafka/common/requests/AlterConfigsRequest.java |   2 +-
 .../common/requests/DescribeConfigsRequest.java    |   2 +-
 .../apache/kafka/common/requests/FetchRequest.java |   1 +
 .../kafka/common/requests/ListOffsetRequest.java   |   2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 516 ++++++++++++++++++++-
 .../kafka/clients/admin/MockAdminClient.java       |  11 +
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   2 +-
 .../clients/consumer/internals/FetcherTest.java    |   2 +-
 .../kafka/common/requests/RequestResponseTest.java |   1 +
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  92 ++--
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |   4 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |   4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   2 +-
 .../unit/kafka/server/ListOffsetsRequestTest.scala |   4 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |   2 +-
 .../apache/kafka/streams/StreamsConfigTest.java    |   4 +-
 .../streams/integration/EosIntegrationTest.java    |   2 +-
 .../streams/tests/BrokerCompatibilityTest.java     |   2 +-
 .../apache/kafka/streams/tests/EosTestDriver.java  |   2 +-
 39 files changed, 1485 insertions(+), 270 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 915bd72..a6458b1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -25,6 +25,8 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -1081,6 +1083,55 @@ public interface Admin extends AutoCloseable {
     MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options);
 
     /**
+     * <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
+     *
+     * <p>This is a convenience method for {@link #alterConsumerGroupOffsets(String, Map, AlterConsumerGroupOffsetsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @param groupId The group for which to alter offsets.
+     * @param offsets A map of offsets by partition with associated metadata.
+     * @return The AlterOffsetsResult.
+     */
+    default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
+        return alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions());
+    }
+
+    /**
+     * <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
+     *
+     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
+     *
+     * @param groupId The group for which to alter offsets.
+     * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
+     * @param options The options to use when altering the offsets.
+     * @return The AlterOffsetsResult.
+     */
+    AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);
+
+    /**
+     * <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
+     * the beginning offset, end offset as well as the offset matching a timestamp in partitions.
+     *
+     * <p>This is a convenience method for {@link #listOffsets(Map, ListOffsetsOptions)}
+     *
+     * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
+     * @return The ListOffsetsResult.
+     */
+    default ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
+        return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
+    }
+
+    /**
+     * <p>List offset for the specified partitions. This operation enables to find
+     * the beginning offset, end offset as well as the offset matching a timestamp in partitions.
+     *
+     * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
+     * @param options The options to use when retrieving the offsets
+     * @return The ListOffsetsResult.
+     */
+    ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
+
+    /**
      * Get the metrics kept by the adminClient
      */
     Map<MetricName, ? extends Metric> metrics();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java
similarity index 58%
copy from clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java
index a09b625..eb8b8ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java
@@ -14,29 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.requests;
+package org.apache.kafka.clients.admin;
 
-public enum IsolationLevel {
-    READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1);
+import org.apache.kafka.common.annotation.InterfaceStability;
 
-    private final byte id;
-
-    IsolationLevel(byte id) {
-        this.id = id;
-    }
-
-    public byte id() {
-        return id;
-    }
-
-    public static IsolationLevel forId(byte id) {
-        switch (id) {
-            case 0:
-                return READ_UNCOMMITTED;
-            case 1:
-                return READ_COMMITTED;
-            default:
-                throw new IllegalArgumentException("Unknown isolation level " + id);
-        }
-    }
+/**
+ * Options for the {@link AdminClient#alterConsumerGroupOffsets(String, Map)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterConsumerGroupOffsetsOptions extends AbstractOptions<AlterConsumerGroupOffsetsOptions> {
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
new file mode 100644
index 0000000..38ee14a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.KafkaFuture.BaseFunction;
+import org.apache.kafka.common.KafkaFuture.BiConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
+
+/**
+ * The result of the {@link AdminClient#alterConsumerGroupOffsets(String, Map)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterConsumerGroupOffsetsResult {
+
+    private final KafkaFuture<Map<TopicPartition, Errors>> future;
+
+    AlterConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which can be used to check the result for a given partition.
+     */
+    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+        this.future.whenComplete(new BiConsumer<Map<TopicPartition, Errors>, Throwable>() {
+            @Override
+            public void accept(final Map<TopicPartition, Errors> topicPartitions, final Throwable throwable) {
+                if (throwable != null) {
+                    result.completeExceptionally(throwable);
+                } else if (!topicPartitions.containsKey(partition)) {
+                    result.completeExceptionally(new IllegalArgumentException(
+                        "Alter offset for partition \"" + partition + "\" was not attempted"));
+                } else {
+                    final Errors error = topicPartitions.get(partition);
+                    if (error == Errors.NONE) {
+                        result.complete(null);
+                    } else {
+                        result.completeExceptionally(error.exception());
+                    }
+                }
+
+            }
+        });
+
+        return result;
+    }
+
+    /**
+     * Return a future which succeeds if all the alter offsets succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return this.future.thenApply(new BaseFunction<Map<TopicPartition, Errors>, Void>() {
+            @Override
+            public Void apply(final Map<TopicPartition, Errors> topicPartitionErrorsMap) {
+                List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
+                        .stream()
+                        .filter(e -> e.getValue() != Errors.NONE)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toList());
+                for (Errors error : topicPartitionErrorsMap.values()) {
+                    if (error != Errors.NONE) {
+                        throw error.exception(
+                            "Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
+                    }
+                }
+                return null;
+            }
+        });
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 0850ced..984e863 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -29,7 +29,11 @@ import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.OffsetSpec.TimestampSpec;
 import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
+import org.apache.kafka.clients.admin.internals.ConsumerGroupOperationContext;
+import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@@ -87,6 +91,11 @@ import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
 import org.apache.kafka.common.message.OffsetDeleteRequestData;
 import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestPartition;
 import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic;
@@ -153,10 +162,15 @@ import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
 import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
 import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetDeleteRequest;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -187,6 +201,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -714,6 +729,7 @@ public class KafkaAdminClient extends AdminClient {
          *
          * @return          The AbstractRequest builder.
          */
+        @SuppressWarnings("rawtypes")
         abstract AbstractRequest.Builder createRequest(int timeoutMs);
 
         /**
@@ -1271,7 +1287,7 @@ public class KafkaAdminClient extends AdminClient {
             return new Call(true, "fetchMetadata", calcDeadlineMs(now, defaultTimeoutMs),
                     new MetadataUpdateNodeIdProvider()) {
                 @Override
-                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                public MetadataRequest.Builder createRequest(int timeoutMs) {
                     // Since this only requests node information, it's safe to pass true
                     // for allowAutoTopicCreation (and it simplifies communication with
                     // older brokers)
@@ -1338,7 +1354,7 @@ public class KafkaAdminClient extends AdminClient {
             new ControllerNodeProvider()) {
 
             @Override
-            public AbstractRequest.Builder createRequest(int timeoutMs) {
+            public CreateTopicsRequest.Builder createRequest(int timeoutMs) {
                 return new CreateTopicsRequest.Builder(
                     new CreateTopicsRequestData().
                         setTopics(topics).
@@ -1435,7 +1451,7 @@ public class KafkaAdminClient extends AdminClient {
             new ControllerNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
                 return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
                         .setTopicNames(validTopicNames)
                         .setTimeoutMs(timeoutMs));
@@ -1495,7 +1511,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            MetadataRequest.Builder createRequest(int timeoutMs) {
                 return MetadataRequest.Builder.allTopics();
             }
 
@@ -1542,7 +1558,7 @@ public class KafkaAdminClient extends AdminClient {
             private boolean supportsDisablingTopicCreation = true;
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            MetadataRequest.Builder createRequest(int timeoutMs) {
                 if (supportsDisablingTopicCreation)
                     return new MetadataRequest.Builder(new MetadataRequestData()
                         .setTopics(convertToMetadataRequestTopic(topicNamesList))
@@ -1624,7 +1640,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            MetadataRequest.Builder createRequest(int timeoutMs) {
                 // Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
                 // simplifies communication with older brokers)
                 return new MetadataRequest.Builder(new MetadataRequestData()
@@ -1676,7 +1692,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            DescribeAclsRequest.Builder createRequest(int timeoutMs) {
                 return new DescribeAclsRequest.Builder(filter);
             }
 
@@ -1720,7 +1736,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            CreateAclsRequest.Builder createRequest(int timeoutMs) {
                 return new CreateAclsRequest.Builder(aclCreations);
             }
 
@@ -1768,7 +1784,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            DeleteAclsRequest.Builder createRequest(int timeoutMs) {
                 return new DeleteAclsRequest.Builder(filterList);
             }
 
@@ -1834,7 +1850,7 @@ public class KafkaAdminClient extends AdminClient {
                 new LeastLoadedNodeProvider()) {
 
                 @Override
-                AbstractRequest.Builder createRequest(int timeoutMs) {
+                DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
                     return new DescribeConfigsRequest.Builder(unifiedRequestResources)
                             .includeSynonyms(options.includeSynonyms());
                 }
@@ -1881,7 +1897,7 @@ public class KafkaAdminClient extends AdminClient {
                     new ConstantNodeIdProvider(nodeId)) {
 
                 @Override
-                AbstractRequest.Builder createRequest(int timeoutMs) {
+                DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
                     return new DescribeConfigsRequest.Builder(Collections.singleton(resource))
                             .includeSynonyms(options.includeSynonyms());
                 }
@@ -1995,7 +2011,7 @@ public class KafkaAdminClient extends AdminClient {
         runnable.call(new Call("alterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) {
 
             @Override
-            public AbstractRequest.Builder createRequest(int timeoutMs) {
+            public AlterConfigsRequest.Builder createRequest(int timeoutMs) {
                 return new AlterConfigsRequest.Builder(requestMap, options.shouldValidateOnly());
             }
 
@@ -2055,7 +2071,7 @@ public class KafkaAdminClient extends AdminClient {
         runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) {
 
             @Override
-            public AbstractRequest.Builder createRequest(int timeoutMs) {
+            public IncrementalAlterConfigsRequest.Builder createRequest(int timeoutMs) {
                 return new IncrementalAlterConfigsRequest.Builder(
                         toIncrementalAlterConfigsRequestData(resources, configs, options.shouldValidateOnly()));
             }
@@ -2131,7 +2147,7 @@ public class KafkaAdminClient extends AdminClient {
                 new ConstantNodeIdProvider(brokerId)) {
 
                 @Override
-                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                public AlterReplicaLogDirsRequest.Builder createRequest(int timeoutMs) {
                     return new AlterReplicaLogDirsRequest.Builder(assignment);
                 }
 
@@ -2177,7 +2193,7 @@ public class KafkaAdminClient extends AdminClient {
                 new ConstantNodeIdProvider(brokerId)) {
 
                 @Override
-                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) {
                     // Query selected partitions in all log directories
                     return new DescribeLogDirsRequest.Builder(null);
                 }
@@ -2231,7 +2247,7 @@ public class KafkaAdminClient extends AdminClient {
                 new ConstantNodeIdProvider(brokerId)) {
 
                 @Override
-                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) {
                     // Query selected partitions in all log directories
                     return new DescribeLogDirsRequest.Builder(topicPartitions);
                 }
@@ -2302,7 +2318,7 @@ public class KafkaAdminClient extends AdminClient {
                 new ControllerNodeProvider()) {
 
             @Override
-            public AbstractRequest.Builder createRequest(int timeoutMs) {
+            public CreatePartitionsRequest.Builder createRequest(int timeoutMs) {
                 return new CreatePartitionsRequest.Builder(requestMap, timeoutMs, options.validateOnly());
             }
 
@@ -2360,7 +2376,7 @@ public class KafkaAdminClient extends AdminClient {
                 new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            MetadataRequest.Builder createRequest(int timeoutMs) {
                 return new MetadataRequest.Builder(new MetadataRequestData()
                     .setTopics(convertToMetadataRequestTopic(topics))
                     .setAllowAutoTopicCreation(false));
@@ -2404,7 +2420,7 @@ public class KafkaAdminClient extends AdminClient {
                             new ConstantNodeIdProvider(brokerId)) {
 
                         @Override
-                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                        DeleteRecordsRequest.Builder createRequest(int timeoutMs) {
                             return new DeleteRecordsRequest.Builder(timeoutMs, partitionDeleteOffsets);
                         }
 
@@ -2455,7 +2471,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder<CreateDelegationTokenRequest> createRequest(int timeoutMs) {
+            CreateDelegationTokenRequest.Builder createRequest(int timeoutMs) {
                 return new CreateDelegationTokenRequest.Builder(
                         new CreateDelegationTokenRequestData()
                             .setRenewers(renewers)
@@ -2493,7 +2509,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder<RenewDelegationTokenRequest> createRequest(int timeoutMs) {
+            RenewDelegationTokenRequest.Builder createRequest(int timeoutMs) {
                 return new RenewDelegationTokenRequest.Builder(
                         new RenewDelegationTokenRequestData()
                         .setHmac(hmac)
@@ -2527,7 +2543,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder<ExpireDelegationTokenRequest> createRequest(int timeoutMs) {
+            ExpireDelegationTokenRequest.Builder createRequest(int timeoutMs) {
                 return new ExpireDelegationTokenRequest.Builder(
                         new ExpireDelegationTokenRequestData()
                             .setHmac(hmac)
@@ -2561,7 +2577,7 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            DescribeDelegationTokenRequest.Builder createRequest(int timeoutMs) {
                 return new DescribeDelegationTokenRequest.Builder(options.owners());
             }
 
@@ -2584,72 +2600,23 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeDelegationTokenResult(tokensFuture);
     }
 
-    /**
-     * Context class to encapsulate parameters of a call to find and use a consumer group coordinator.
-     * Some of the parameters are provided at construction and are immutable whereas others are provided
-     * as "Call" are completed and values are available, like node id of the coordinator.
-     *
-     * @param <T> The type of return value of the KafkaFuture
-     * @param <O> The type of configuration option. Different for different consumer group commands.
-     */
-    private final static class ConsumerGroupOperationContext<T, O extends AbstractOptions<O>> {
-        final private String groupId;
-        final private O options;
-        final private long deadline;
-        final private KafkaFutureImpl<T> future;
-        private Optional<Node> node;
-
-        public ConsumerGroupOperationContext(String groupId,
-                                             O options,
-                                             long deadline,
-                                             KafkaFutureImpl<T> future) {
-            this.groupId = groupId;
-            this.options = options;
-            this.deadline = deadline;
-            this.future = future;
-            this.node = Optional.empty();
-        }
-
-        public String getGroupId() {
-            return groupId;
-        }
-
-        public O getOptions() {
-            return options;
-        }
-
-        public long getDeadline() {
-            return deadline;
-        }
-
-        public KafkaFutureImpl<T> getFuture() {
-            return future;
-        }
-
-        public Optional<Node> getNode() {
-            return node;
-        }
-
-        public void setNode(Node node) {
-            this.node = Optional.ofNullable(node);
-        }
-
-        public boolean hasCoordinatorMoved(AbstractResponse response) {
-            return response.errorCounts().keySet()
-                    .stream()
-                    .anyMatch(error -> error == Errors.NOT_COORDINATOR);
-        }
-    }
-
-    private void rescheduleTask(ConsumerGroupOperationContext<?, ?> context, Supplier<Call> nextCall) {
+    private void rescheduleFindCoordinatorTask(ConsumerGroupOperationContext<?, ?> context, Supplier<Call> nextCall) {
         log.info("Node {} is no longer the Coordinator. Retrying with new coordinator.",
-                context.getNode().orElse(null));
+                context.node().orElse(null));
         // Requeue the task so that we can try with new coordinator
         context.setNode(null);
         Call findCoordinatorCall = getFindCoordinatorCall(context, nextCall);
         runnable.call(findCoordinatorCall, time.milliseconds());
     }
 
+    private void rescheduleMetadataTask(MetadataOperationContext<?, ?> context, Supplier<List<Call>> nextCalls) {
+        log.info("Retrying to fetch metadata.");
+        // Requeue the task so that we can re-attempt fetching metadata
+        context.setResponse(Optional.empty());
+        Call metadataCall = getMetadataCall(context, nextCalls);
+        runnable.call(metadataCall, time.milliseconds());
+    }
+
     private static <T> Map<String, KafkaFutureImpl<T>> createFutures(Collection<String> groupIds) {
         return new HashSet<>(groupIds).stream().collect(
             Collectors.toMap(groupId -> groupId,
@@ -2703,21 +2670,21 @@ public class KafkaAdminClient extends AdminClient {
      * @param <O> The type of configuration option, like DescribeConsumerGroupsOptions, ListConsumerGroupsOptions etc
      */
     private <T, O extends AbstractOptions<O>> Call getFindCoordinatorCall(ConsumerGroupOperationContext<T, O> context,
-                                               Supplier<Call> nextCall) {
-        return new Call("findCoordinator", context.getDeadline(), new LeastLoadedNodeProvider()) {
+                                                                          Supplier<Call> nextCall) {
+        return new Call("findCoordinator", context.deadline(), new LeastLoadedNodeProvider()) {
             @Override
             FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
                 return new FindCoordinatorRequest.Builder(
                         new FindCoordinatorRequestData()
                                 .setKeyType(CoordinatorType.GROUP.id())
-                                .setKey(context.getGroupId()));
+                                .setKey(context.groupId()));
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
 
-                if (handleGroupRequestError(response.error(), context.getFuture()))
+                if (handleGroupRequestError(response.error(), context.future()))
                     return;
 
                 context.setNode(response.node());
@@ -2727,7 +2694,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleFailure(Throwable throwable) {
-                context.getFuture().completeExceptionally(throwable);
+                context.future().completeExceptionally(throwable);
             }
         };
     }
@@ -2735,14 +2702,14 @@ public class KafkaAdminClient extends AdminClient {
     private Call getDescribeConsumerGroupsCall(
             ConsumerGroupOperationContext<ConsumerGroupDescription, DescribeConsumerGroupsOptions> context) {
         return new Call("describeConsumerGroups",
-                context.getDeadline(),
-                new ConstantNodeIdProvider(context.getNode().get().id())) {
+                context.deadline(),
+                new ConstantNodeIdProvider(context.node().get().id())) {
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            DescribeGroupsRequest.Builder createRequest(int timeoutMs) {
                 return new DescribeGroupsRequest.Builder(
                     new DescribeGroupsRequestData()
-                        .setGroups(Collections.singletonList(context.getGroupId()))
-                        .setIncludeAuthorizedOperations(context.getOptions().includeAuthorizedOperations()));
+                        .setGroups(Collections.singletonList(context.groupId()))
+                        .setIncludeAuthorizedOperations(context.options().includeAuthorizedOperations()));
             }
 
             @Override
@@ -2751,29 +2718,29 @@ public class KafkaAdminClient extends AdminClient {
 
                 List<DescribedGroup> describedGroups = response.data().groups();
                 if (describedGroups.isEmpty()) {
-                    context.getFuture().completeExceptionally(
-                            new InvalidGroupIdException("No consumer group found for GroupId: " + context.getGroupId()));
+                    context.future().completeExceptionally(
+                            new InvalidGroupIdException("No consumer group found for GroupId: " + context.groupId()));
                     return;
                 }
 
                 if (describedGroups.size() > 1 ||
-                        !describedGroups.get(0).groupId().equals(context.getGroupId())) {
+                        !describedGroups.get(0).groupId().equals(context.groupId())) {
                     String ids = Arrays.toString(describedGroups.stream().map(DescribedGroup::groupId).toArray());
-                    context.getFuture().completeExceptionally(new InvalidGroupIdException(
-                            "DescribeConsumerGroup request for GroupId: " + context.getGroupId() + " returned " + ids));
+                    context.future().completeExceptionally(new InvalidGroupIdException(
+                            "DescribeConsumerGroup request for GroupId: " + context.groupId() + " returned " + ids));
                     return;
                 }
 
                 final DescribedGroup describedGroup = describedGroups.get(0);
 
                 // If coordinator changed since we fetched it, retry
-                if (context.hasCoordinatorMoved(response)) {
-                    rescheduleTask(context, () -> getDescribeConsumerGroupsCall(context));
+                if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+                    rescheduleFindCoordinatorTask(context, () -> getDescribeConsumerGroupsCall(context));
                     return;
                 }
 
                 final Errors groupError = Errors.forCode(describedGroup.errorCode());
-                if (handleGroupRequestError(groupError, context.getFuture()))
+                if (handleGroupRequestError(groupError, context.future()))
                     return;
 
                 final String protocolType = describedGroup.protocolType();
@@ -2797,19 +2764,59 @@ public class KafkaAdminClient extends AdminClient {
                         memberDescriptions.add(memberDescription);
                     }
                     final ConsumerGroupDescription consumerGroupDescription =
-                        new ConsumerGroupDescription(context.getGroupId(), protocolType.isEmpty(),
+                        new ConsumerGroupDescription(context.groupId(), protocolType.isEmpty(),
                             memberDescriptions,
                             describedGroup.protocolData(),
                             ConsumerGroupState.parse(describedGroup.groupState()),
-                            context.getNode().get(),
+                            context.node().get(),
                             authorizedOperations);
-                    context.getFuture().complete(consumerGroupDescription);
+                    context.future().complete(consumerGroupDescription);
                 }
             }
 
             @Override
             void handleFailure(Throwable throwable) {
-                context.getFuture().completeExceptionally(throwable);
+                context.future().completeExceptionally(throwable);
+            }
+        };
+    }
+
+    /**
+     * Returns a {@code Call} object to fetch the cluster metadata. Takes a List of Calls
+     * parameter to schedule actions that need to be taken using the metadata. The param is a Supplier
+     * so that it can be lazily created, so that it can use the results of the metadata call in its
+     * construction.
+     *
+     * @param <T> The type of return value of the KafkaFuture, like ListOffsetsResultInfo, etc.
+     * @param <O> The type of configuration option, like ListOffsetsOptions, etc
+     */
+    private <T, O extends AbstractOptions<O>> Call getMetadataCall(MetadataOperationContext<T, O> context,
+                                                                   Supplier<List<Call>> nextCalls) {
+        return new Call("metadata", context.deadline(), new LeastLoadedNodeProvider()) {
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(new MetadataRequestData()
+                    .setTopics(convertToMetadataRequestTopic(context.topics()))
+                    .setAllowAutoTopicCreation(false));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) abstractResponse;
+                MetadataOperationContext.handleMetadataErrors(response);
+
+                context.setResponse(Optional.of(response));
+
+                for (Call call : nextCalls.get()) {
+                    runnable.call(call, time.milliseconds());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                for (KafkaFutureImpl<T> future : context.futures().values()) {
+                    future.completeExceptionally(throwable);
+                }
             }
         };
     }
@@ -2890,7 +2897,7 @@ public class KafkaAdminClient extends AdminClient {
         final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
         runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) {
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            MetadataRequest.Builder createRequest(int timeoutMs) {
                 return new MetadataRequest.Builder(new MetadataRequestData()
                     .setTopics(Collections.emptyList())
                     .setAllowAutoTopicCreation(true));
@@ -2910,7 +2917,7 @@ public class KafkaAdminClient extends AdminClient {
                     final long nowList = time.milliseconds();
                     runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) {
                         @Override
-                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                        ListGroupsRequest.Builder createRequest(int timeoutMs) {
                             return new ListGroupsRequest.Builder(new ListGroupsRequestData());
                         }
 
@@ -2981,11 +2988,11 @@ public class KafkaAdminClient extends AdminClient {
 
     private Call getListConsumerGroupOffsetsCall(ConsumerGroupOperationContext<Map<TopicPartition, OffsetAndMetadata>,
             ListConsumerGroupOffsetsOptions> context) {
-        return new Call("listConsumerGroupOffsets", context.getDeadline(),
-                new ConstantNodeIdProvider(context.getNode().get().id())) {
+        return new Call("listConsumerGroupOffsets", context.deadline(),
+                new ConstantNodeIdProvider(context.node().get().id())) {
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new OffsetFetchRequest.Builder(context.getGroupId(), context.getOptions().topicPartitions());
+            OffsetFetchRequest.Builder createRequest(int timeoutMs) {
+                return new OffsetFetchRequest.Builder(context.groupId(), context.options().topicPartitions());
             }
 
             @Override
@@ -2994,12 +3001,12 @@ public class KafkaAdminClient extends AdminClient {
                 final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
 
                 // If coordinator changed since we fetched it, retry
-                if (context.hasCoordinatorMoved(response)) {
-                    rescheduleTask(context, () -> getListConsumerGroupOffsetsCall(context));
+                if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+                    rescheduleFindCoordinatorTask(context, () -> getListConsumerGroupOffsetsCall(context));
                     return;
                 }
 
-                if (handleGroupRequestError(response.error(), context.getFuture()))
+                if (handleGroupRequestError(response.error(), context.future()))
                     return;
 
                 for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
@@ -3017,12 +3024,12 @@ public class KafkaAdminClient extends AdminClient {
                         log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
                     }
                 }
-                context.getFuture().complete(groupOffsetsListing);
+                context.future().complete(groupOffsetsListing);
             }
 
             @Override
             void handleFailure(Throwable throwable) {
-                context.getFuture().completeExceptionally(throwable);
+                context.future().completeExceptionally(throwable);
             }
         };
     }
@@ -3053,13 +3060,13 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     private Call getDeleteConsumerGroupsCall(ConsumerGroupOperationContext<Void, DeleteConsumerGroupsOptions> context) {
-        return new Call("deleteConsumerGroups", context.getDeadline(), new ConstantNodeIdProvider(context.getNode().get().id())) {
+        return new Call("deleteConsumerGroups", context.deadline(), new ConstantNodeIdProvider(context.node().get().id())) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            DeleteGroupsRequest.Builder createRequest(int timeoutMs) {
                 return new DeleteGroupsRequest.Builder(
                     new DeleteGroupsRequestData()
-                        .setGroupsNames(Collections.singletonList(context.getGroupId()))
+                        .setGroupsNames(Collections.singletonList(context.groupId()))
                 );
             }
 
@@ -3068,21 +3075,21 @@ public class KafkaAdminClient extends AdminClient {
                 final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
 
                 // If coordinator changed since we fetched it, retry
-                if (context.hasCoordinatorMoved(response)) {
-                    rescheduleTask(context, () -> getDeleteConsumerGroupsCall(context));
+                if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+                    rescheduleFindCoordinatorTask(context, () -> getDeleteConsumerGroupsCall(context));
                     return;
                 }
 
-                final Errors groupError = response.get(context.getGroupId());
-                if (handleGroupRequestError(groupError, context.getFuture()))
+                final Errors groupError = response.get(context.groupId());
+                if (handleGroupRequestError(groupError, context.future()))
                     return;
 
-                context.getFuture().complete(null);
+                context.future().complete(null);
             }
 
             @Override
             void handleFailure(Throwable throwable) {
-                context.getFuture().completeExceptionally(throwable);
+                context.future().completeExceptionally(throwable);
             }
         };
     }
@@ -3115,10 +3122,10 @@ public class KafkaAdminClient extends AdminClient {
     private Call getDeleteConsumerGroupOffsetsCall(
             ConsumerGroupOperationContext<Map<TopicPartition, Errors>, DeleteConsumerGroupOffsetsOptions> context,
             Set<TopicPartition> partitions) {
-        return new Call("deleteConsumerGroupOffsets", context.getDeadline(), new ConstantNodeIdProvider(context.getNode().get().id())) {
+        return new Call("deleteConsumerGroupOffsets", context.deadline(), new ConstantNodeIdProvider(context.node().get().id())) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            OffsetDeleteRequest.Builder createRequest(int timeoutMs) {
                 final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection();
 
                 partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> {
@@ -3134,7 +3141,7 @@ public class KafkaAdminClient extends AdminClient {
 
                 return new OffsetDeleteRequest.Builder(
                     new OffsetDeleteRequestData()
-                        .setGroupId(context.groupId)
+                        .setGroupId(context.groupId())
                         .setTopics(topics)
                 );
             }
@@ -3144,14 +3151,14 @@ public class KafkaAdminClient extends AdminClient {
                 final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse;
 
                 // If coordinator changed since we fetched it, retry
-                if (context.hasCoordinatorMoved(response)) {
-                    rescheduleTask(context, () -> getDeleteConsumerGroupOffsetsCall(context, partitions));
+                if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+                    rescheduleFindCoordinatorTask(context, () -> getDeleteConsumerGroupOffsetsCall(context, partitions));
                     return;
                 }
 
                 // If the error is an error at the group level, the future is failed with it
                 final Errors groupError = Errors.forCode(response.data.errorCode());
-                if (handleGroupRequestError(groupError, context.getFuture()))
+                if (handleGroupRequestError(groupError, context.future()))
                     return;
 
                 final Map<TopicPartition, Errors> partitions = new HashMap<>();
@@ -3163,12 +3170,12 @@ public class KafkaAdminClient extends AdminClient {
                     });
                 });
 
-                context.getFuture().complete(partitions);
+                context.future().complete(partitions);
             }
 
             @Override
             void handleFailure(Throwable throwable) {
-                context.getFuture().completeExceptionally(throwable);
+                context.future().completeExceptionally(throwable);
             }
         };
     }
@@ -3189,7 +3196,7 @@ public class KafkaAdminClient extends AdminClient {
                 new ControllerNodeProvider()) {
 
             @Override
-            public AbstractRequest.Builder createRequest(int timeoutMs) {
+            public ElectLeadersRequest.Builder createRequest(int timeoutMs) {
                 return new ElectLeadersRequest.Builder(electionType, topicPartitions, timeoutMs);
             }
 
@@ -3254,7 +3261,7 @@ public class KafkaAdminClient extends AdminClient {
                 new ControllerNodeProvider()) {
 
             @Override
-            public AbstractRequest.Builder createRequest(int timeoutMs) {
+            public AlterPartitionReassignmentsRequest.Builder createRequest(int timeoutMs) {
                 AlterPartitionReassignmentsRequestData data =
                         new AlterPartitionReassignmentsRequestData();
                 for (Map.Entry<String, Map<Integer, Optional<NewPartitionReassignment>>> entry :
@@ -3389,7 +3396,7 @@ public class KafkaAdminClient extends AdminClient {
             new ControllerNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            ListPartitionReassignmentsRequest.Builder createRequest(int timeoutMs) {
                 ListPartitionReassignmentsRequestData listData = new ListPartitionReassignmentsRequestData();
                 listData.setTimeoutMs(timeoutMs);
 
@@ -3482,12 +3489,12 @@ public class KafkaAdminClient extends AdminClient {
     private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext
                                                    <RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context) {
         return new Call("leaveGroup",
-                        context.getDeadline(),
-                        new ConstantNodeIdProvider(context.getNode().get().id())) {
+                        context.deadline(),
+                        new ConstantNodeIdProvider(context.node().get().id())) {
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new LeaveGroupRequest.Builder(context.getGroupId(),
-                                                     context.getOptions().getMembers());
+            LeaveGroupRequest.Builder createRequest(int timeoutMs) {
+                return new LeaveGroupRequest.Builder(context.groupId(),
+                                                     context.options().getMembers());
             }
 
             @Override
@@ -3495,8 +3502,8 @@ public class KafkaAdminClient extends AdminClient {
                 final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse;
 
                 // If coordinator changed since we fetched it, retry
-                if (context.hasCoordinatorMoved(response)) {
-                    rescheduleTask(context, () -> getRemoveMembersFromGroupCall(context));
+                if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+                    rescheduleFindCoordinatorTask(context, () -> getRemoveMembersFromGroupCall(context));
                     return;
                 }
 
@@ -3507,15 +3514,226 @@ public class KafkaAdminClient extends AdminClient {
                 }
 
                 final RemoveMemberFromGroupResult membershipChangeResult =
-                    new RemoveMemberFromGroupResult(response, context.getOptions().getMembers());
+                    new RemoveMemberFromGroupResult(response, context.options().getMembers());
+
+                context.future().complete(membershipChangeResult);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                context.future().completeExceptionally(throwable);
+            }
+        };
+    }
+
+    @Override
+    public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId,
+                                                        Map<TopicPartition, OffsetAndMetadata> offsets,
+                                                        AlterConsumerGroupOffsetsOptions options) {
+        final KafkaFutureImpl<Map<TopicPartition, Errors>> future = new KafkaFutureImpl<>();
+
+        final long startFindCoordinatorMs = time.milliseconds();
+        final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
+
+        ConsumerGroupOperationContext<Map<TopicPartition, Errors>, AlterConsumerGroupOffsetsOptions> context =
+                new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
+
+        Call findCoordinatorCall = getFindCoordinatorCall(context,
+            () -> KafkaAdminClient.this.getAlterConsumerGroupOffsetsCall(context, offsets));
+        runnable.call(findCoordinatorCall, startFindCoordinatorMs);
+
+        return new AlterConsumerGroupOffsetsResult(future);
+    }
+
+    private Call getAlterConsumerGroupOffsetsCall(ConsumerGroupOperationContext<Map<TopicPartition, Errors>,
+                                                  AlterConsumerGroupOffsetsOptions> context,
+                                                  Map<TopicPartition, OffsetAndMetadata> offsets) {
+
+        return new Call("commitOffsets", context.deadline(), new ConstantNodeIdProvider(context.node().get().id())) {
+
+            @Override
+            OffsetCommitRequest.Builder createRequest(int timeoutMs) {
+                List<OffsetCommitRequestTopic> topics = new ArrayList<>();
+                Map<String, List<OffsetCommitRequestPartition>> offsetData = new HashMap<>();
+                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+                    String topic = entry.getKey().topic();
+                    OffsetAndMetadata oam = entry.getValue();
+                    offsetData.compute(topic, (key, value) -> {
+                        if (value == null) {
+                            value = new ArrayList<>();
+                        }
+                        OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition()
+                                .setCommittedOffset(oam.offset())
+                                .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
+                                .setCommittedMetadata(oam.metadata())
+                                .setPartitionIndex(entry.getKey().partition());
+                        value.add(partition);
+                        return value;
+                    });
+                }
+                for (Map.Entry<String, List<OffsetCommitRequestPartition>> entry : offsetData.entrySet()) {
+                    OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic()
+                            .setName(entry.getKey())
+                            .setPartitions(entry.getValue());
+                    topics.add(topic);
+                }
+                OffsetCommitRequestData data = new OffsetCommitRequestData()
+                    .setGroupId(context.groupId())
+                    .setTopics(topics);
+                return new OffsetCommitRequest.Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse;
 
-                context.getFuture().complete(membershipChangeResult);
+                // If coordinator changed since we fetched it, retry
+                if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+                    rescheduleFindCoordinatorTask(context,
+                        () -> getAlterConsumerGroupOffsetsCall(context, offsets));
+                    return;
+                }
+
+                // If there is a coordinator error, retry
+                for (OffsetCommitResponseTopic topic : response.data().topics()) {
+                    for (OffsetCommitResponsePartition partition : topic.partitions()) {
+                        Errors error = Errors.forCode(partition.errorCode());
+                        if (ConsumerGroupOperationContext.shouldRefreshCoordinator(error)) {
+                            rescheduleFindCoordinatorTask(context,
+                                () -> getAlterConsumerGroupOffsetsCall(context, offsets));
+                            return;
+                        }
+                    }
+                }
+
+                final Map<TopicPartition, Errors> partitions = new HashMap<>();
+                for (OffsetCommitResponseTopic topic : response.data().topics()) {
+                    for (OffsetCommitResponsePartition partition : topic.partitions()) {
+                        TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
+                        Errors error = Errors.forCode(partition.errorCode());
+                        partitions.put(tp, error);
+                    }
+                }
+                context.future().complete(partitions);
             }
 
             @Override
             void handleFailure(Throwable throwable) {
-                context.getFuture().completeExceptionally(throwable);
+                context.future().completeExceptionally(throwable);
             }
         };
     }
+
+    @Override
+    public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
+                                         ListOffsetsOptions options) {
+
+        // preparing topics list for asking metadata about them
+        final Map<TopicPartition, KafkaFutureImpl<ListOffsetsResultInfo>> futures = new HashMap<>(topicPartitionOffsets.size());
+        final Set<String> topics = new HashSet<>();
+        for (TopicPartition topicPartition : topicPartitionOffsets.keySet()) {
+            topics.add(topicPartition.topic());
+            futures.put(topicPartition, new KafkaFutureImpl<>());
+        }
+
+        final long nowMetadata = time.milliseconds();
+        final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
+
+        MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> context =
+                new MetadataOperationContext<>(topics, options, deadline, futures);
+
+        Call metadataCall = getMetadataCall(context, 
+            () -> KafkaAdminClient.this.getListOffsetsCalls(context, topicPartitionOffsets, futures));
+        runnable.call(metadataCall, nowMetadata);
+
+        return new ListOffsetsResult(new HashMap<>(futures));
+    }
+
+    private List<Call> getListOffsetsCalls(MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> context,
+                                           Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
+                                           Map<TopicPartition, KafkaFutureImpl<ListOffsetsResultInfo>> futures) {
+
+        MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response"));
+        List<Call> calls = new ArrayList<>();
+        // grouping topic partitions per leader
+        Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> leaders = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, OffsetSpec> entry: topicPartitionOffsets.entrySet()) {
+
+            OffsetSpec offsetSpec = entry.getValue();
+            TopicPartition tp = entry.getKey();
+            KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
+            long offsetQuery = (offsetSpec instanceof TimestampSpec)
+                    ? ((TimestampSpec) offsetSpec).timestamp()
+                    : (offsetSpec instanceof OffsetSpec.EarliestSpec)
+                        ? ListOffsetRequest.EARLIEST_TIMESTAMP
+                        : ListOffsetRequest.LATEST_TIMESTAMP;
+            // avoid sending listOffsets request for topics with errors
+            if (!mr.errors().containsKey(tp.topic())) {
+                Node node = mr.cluster().leaderFor(tp);
+                if (node != null) {
+                    Map<TopicPartition, ListOffsetRequest.PartitionData> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>());
+                    leadersOnNode.put(tp, new ListOffsetRequest.PartitionData(offsetQuery, Optional.empty()));
+                } else {
+                    future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
+                }
+            } else {
+                future.completeExceptionally(mr.errors().get(tp.topic()).exception());
+            }
+        }
+
+        for (final Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry: leaders.entrySet()) {
+            final int brokerId = entry.getKey().id();
+            final Map<TopicPartition, ListOffsetRequest.PartitionData> partitionsToQuery = entry.getValue();
+
+            calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+
+                @Override
+                ListOffsetRequest.Builder createRequest(int timeoutMs) {
+                    return ListOffsetRequest.Builder
+                            .forConsumer(true, context.options().isolationLevel())
+                            .setTargetTimes(partitionsToQuery);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    ListOffsetResponse response = (ListOffsetResponse) abstractResponse;
+                    Set<TopicPartition> partitionsWithErrors = new HashSet<>();
+
+                    for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet()) {
+                        TopicPartition tp = result.getKey();
+                        PartitionData partitionData = result.getValue();
+
+                        KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
+                        Errors error = partitionData.error;
+                        if (MetadataOperationContext.shouldRefreshMetadata(error)) {
+                            partitionsWithErrors.add(tp);
+                        } else if (error == Errors.NONE) {
+                            future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch));
+                        } else {
+                            future.completeExceptionally(error.exception());
+                        }
+                    }
+
+                    if (!partitionsWithErrors.isEmpty()) {
+                        partitionsToQuery.keySet().retainAll(partitionsWithErrors);
+                        Set<String> retryTopics = partitionsWithErrors.stream().map(tp -> tp.topic()).collect(Collectors.toSet());
+                        MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> retryContext =
+                                new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures);
+                        rescheduleMetadataTask(retryContext, () -> Collections.singletonList(this));
+                    }
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    for (TopicPartition tp : entry.getValue().keySet()) {
+                        KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
+                        future.completeExceptionally(throwable);
+                    }
+                }
+            });
+        }
+        return calls;
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java
similarity index 53%
copy from clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java
index a09b625..0e116e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java
@@ -14,29 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.requests;
+package org.apache.kafka.clients.admin;
 
-public enum IsolationLevel {
-    READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1);
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.annotation.InterfaceStability;
 
-    private final byte id;
+/**
+ * Options for {@link AdminClient#listOffsets(Map)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> {
+
+    private final IsolationLevel isolationLevel;
 
-    IsolationLevel(byte id) {
-        this.id = id;
+    public ListOffsetsOptions() {
+        this(IsolationLevel.READ_UNCOMMITTED);
     }
 
-    public byte id() {
-        return id;
+    public ListOffsetsOptions(IsolationLevel isolationLevel) {
+        this.isolationLevel = isolationLevel;
     }
 
-    public static IsolationLevel forId(byte id) {
-        switch (id) {
-            case 0:
-                return READ_UNCOMMITTED;
-            case 1:
-                return READ_COMMITTED;
-            default:
-                throw new IllegalArgumentException("Unknown isolation level " + id);
-        }
+    public IsolationLevel isolationLevel() {
+        return isolationLevel;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java
new file mode 100644
index 0000000..d830ef2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The result of the {@link AdminClient#listOffsets(Map)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListOffsetsResult {
+
+    private final Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures;
+
+    ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+    * Return a future which can be used to check the result for a given partition.
+    */
+    public KafkaFuture<ListOffsetsResultInfo> partitionResult(final TopicPartition partition) {
+        KafkaFuture<ListOffsetsResultInfo> future = futures.get(partition);
+        if (future == null) {
+            throw new IllegalArgumentException(
+                    "List Offsets for partition \"" + partition + "\" was not attempted");
+        }
+        return future;
+    }
+
+    /**
+     * Return a future which succeeds only if offsets for all specified partitions have been successfully
+     * retrieved.
+     */
+    public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
+                .thenApply(new KafkaFuture.BaseFunction<Void, Map<TopicPartition, ListOffsetsResultInfo>>() {
+                    @Override
+                    public Map<TopicPartition, ListOffsetsResultInfo> apply(Void v) {
+                        Map<TopicPartition, ListOffsetsResultInfo> offsets = new HashMap<>(futures.size());
+                        for (Map.Entry<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> entry : futures.entrySet()) {
+                            try {
+                                offsets.put(entry.getKey(), entry.getValue().get());
+                            } catch (InterruptedException | ExecutionException e) {
+                                // This should be unreachable, because allOf ensured that all the futures completed successfully.
+                                throw new RuntimeException(e);
+                            }
+                        }
+                        return offsets;
+                    }
+                });
+    }
+
+    public static class ListOffsetsResultInfo {
+
+        private final long offset;
+        private final long timestamp;
+        private final Optional<Integer> leaderEpoch;
+
+        ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {
+            this.offset = offset;
+            this.timestamp = timestamp;
+            this.leaderEpoch = leaderEpoch;
+        }
+
+        public long offset() {
+            return offset;
+        }
+
+        public long timestamp() {
+            return timestamp;
+        }
+
+        public Optional<Integer> leaderEpoch() {
+            return leaderEpoch;
+        }
+
+        @Override
+        public String toString() {
+            return "ListOffsetsResultInfo(offset=" + offset + ", timestamp=" + timestamp + ", leaderEpoch="
+                    + leaderEpoch + ")";
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
new file mode 100644
index 0000000..8955b41
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+
+/** 
+ * This class allows to specify the desired offsets when using {@link KafkaAdminClient#listOffsets(Map, ListOffsetsOptions)}
+ */
+public class OffsetSpec {
+
+    static class EarliestSpec extends OffsetSpec { }
+    static class LatestSpec extends OffsetSpec { }
+    static class TimestampSpec extends OffsetSpec {
+        private final long timestamp;
+
+        TimestampSpec(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        long timestamp() {
+            return timestamp;
+        }
+    }
+
+    /**
+     * Used to retrieve the latest offset of a partition
+     */
+    public static OffsetSpec latest() {
+        return new LatestSpec();
+    }
+
+    /**
+     * Used to retrieve the earliest offset of a partition
+     */
+    public static OffsetSpec earliest() {
+        return new EarliestSpec();
+    }
+
+    /**
+     * Used to retrieve the the earliest offset whose timestamp is greater than
+     * or equal to the given timestamp in the corresponding partition
+     * @param timestamp in milliseconds
+     */
+    public static OffsetSpec forTimestamp(long timestamp) {
+        return new TimestampSpec(timestamp);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java
new file mode 100644
index 0000000..bd4415c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.Optional;
+
+import org.apache.kafka.clients.admin.AbstractOptions;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+/**
+ * Context class to encapsulate parameters of a call to find and use a consumer group coordinator.
+ * Some of the parameters are provided at construction and are immutable whereas others are provided
+ * as "Call" are completed and values are available, like node id of the coordinator.
+ *
+ * @param <T> The type of return value of the KafkaFuture
+ * @param <O> The type of configuration option. Different for different consumer group commands.
+ */
+public final class ConsumerGroupOperationContext<T, O extends AbstractOptions<O>> {
+    final private String groupId;
+    final private O options;
+    final private long deadline;
+    final private KafkaFutureImpl<T> future;
+    private Optional<Node> node;
+
+    public ConsumerGroupOperationContext(String groupId,
+                                         O options,
+                                         long deadline,
+                                         KafkaFutureImpl<T> future) {
+        this.groupId = groupId;
+        this.options = options;
+        this.deadline = deadline;
+        this.future = future;
+        this.node = Optional.empty();
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public O options() {
+        return options;
+    }
+
+    public long deadline() {
+        return deadline;
+    }
+
+    public KafkaFutureImpl<T> future() {
+        return future;
+    }
+
+    public Optional<Node> node() {
+        return node;
+    }
+
+    public void setNode(Node node) {
+        this.node = Optional.ofNullable(node);
+    }
+
+    public static boolean hasCoordinatorMoved(AbstractResponse response) {
+        return response.errorCounts().keySet()
+                .stream()
+                .anyMatch(error -> error == Errors.NOT_COORDINATOR);
+    }
+
+    public static boolean shouldRefreshCoordinator(Errors error) {
+        return error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
new file mode 100644
index 0000000..e6f4054
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.clients.admin.AbstractOptions;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
+import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
+
+/**
+ * Context class to encapsulate parameters of a call to fetch and use cluster metadata.
+ * Some of the parameters are provided at construction and are immutable whereas others are provided
+ * as "Call" are completed and values are available.
+ *
+ * @param <T> The type of return value of the KafkaFuture
+ * @param <O> The type of configuration option.
+ */
+public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
+    final private Collection<String> topics;
+    final private O options;
+    final private long deadline;
+    final private Map<TopicPartition, KafkaFutureImpl<T>> futures;
+    private Optional<MetadataResponse> response;
+
+    public MetadataOperationContext(Collection<String> topics,
+                                    O options,
+                                    long deadline,
+                                    Map<TopicPartition, KafkaFutureImpl<T>> futures) {
+        this.topics = topics;
+        this.options = options;
+        this.deadline = deadline;
+        this.futures = futures;
+        this.response = Optional.empty();
+    }
+
+    public void setResponse(Optional<MetadataResponse> response) {
+        this.response = response;
+    }
+
+    public Optional<MetadataResponse> response() {
+        return response;
+    }
+
+    public O options() {
+        return options;
+    }
+
+    public long deadline() {
+        return deadline;
+    }
+
+    public Map<TopicPartition, KafkaFutureImpl<T>> futures() {
+        return futures;
+    }
+
+    public Collection<String> topics() {
+        return topics;
+    }
+
+    public static void handleMetadataErrors(MetadataResponse response) {
+        for (TopicMetadata tm : response.topicMetadata()) {
+            for (PartitionMetadata pm : tm.partitionMetadata()) {
+                if (shouldRefreshMetadata(pm.error())) {
+                    throw pm.error().exception();
+                }
+            }
+        }
+    }
+
+    public static boolean shouldRefreshMetadata(Errors error) {
+        return error.exception() instanceof InvalidMetadataException;
+    }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index e0f7c4d..67897cf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -18,13 +18,13 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.SecurityConfig;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.Collections;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f12beaf..e9e91bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -52,7 +53,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 384da31..a162ddb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
@@ -64,7 +65,6 @@ import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 953505f..bd05909 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -22,11 +22,11 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.PartitionStates;
 import org.apache.kafka.common.requests.EpochEndOffset;
-import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java b/clients/src/main/java/org/apache/kafka/common/IsolationLevel.java
similarity index 96%
rename from clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
rename to clients/src/main/java/org/apache/kafka/common/IsolationLevel.java
index a09b625..79f0a92 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
+++ b/clients/src/main/java/org/apache/kafka/common/IsolationLevel.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.requests;
+package org.apache.kafka.common;
 
 public enum IsolationLevel {
     READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index 963ad06..3fbf13d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -102,7 +102,7 @@ public class AlterConfigsRequest extends AbstractRequest {
 
     }
 
-    public static class Builder extends AbstractRequest.Builder {
+    public static class Builder extends AbstractRequest.Builder<AlterConfigsRequest> {
 
         private final Map<ConfigResource, Config> configs;
         private final boolean validateOnly;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 0ee256f..99bbdd7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -65,7 +65,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
         return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2};
     }
 
-    public static class Builder extends AbstractRequest.Builder {
+    public static class Builder extends AbstractRequest.Builder<DescribeConfigsRequest> {
         private final Map<ConfigResource, Collection<String>> resourceToConfigNames;
         private boolean includeSynonyms;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 2c0455a..f99409e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index e9fe942..094d0d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -217,7 +218,6 @@ public class ListOffsetRequest extends AbstractRequest {
     /**
      * Private constructor with a specified version.
      */
-    @SuppressWarnings("unchecked")
     private ListOffsetRequest(int replicaId,
                               Map<TopicPartition, PartitionData> targetTimes,
                               IsolationLevel isolationLevel,
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6d870fc..4d677bf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@@ -102,9 +103,14 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
+import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
 import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.resource.PatternType;
@@ -276,6 +282,12 @@ public class KafkaAdminClientTest {
         );
     }
 
+    private static OffsetCommitResponse prepareOffsetCommitResponse(TopicPartition tp, Errors error) {
+        Map<TopicPartition, Errors> responseData = new HashMap<>();
+        responseData.put(tp, error);
+        return new OffsetCommitResponse(0, responseData);
+    }
+
     private static CreateTopicsResponse prepareCreateTopicsResponse(String topicName, Errors error) {
         CreateTopicsResponseData data = new CreateTopicsResponseData();
         data.topics().add(new CreatableTopicResult().
@@ -295,6 +307,31 @@ public class KafkaAdminClientTest {
         return FindCoordinatorResponse.prepareResponse(error, node);
     }
 
+    private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
+        List<TopicMetadata> metadata = new ArrayList<>();
+        for (String topic : cluster.topics()) {
+            List<PartitionMetadata> pms = new ArrayList<>();
+            for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
+                PartitionMetadata pm = new PartitionMetadata(error,
+                        pInfo.partition(),
+                        pInfo.leader(),
+                        Optional.of(234),
+                        Arrays.asList(pInfo.replicas()),
+                        Arrays.asList(pInfo.inSyncReplicas()),
+                        Arrays.asList(pInfo.offlineReplicas()));
+                pms.add(pm);
+            }
+            TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
+            metadata.add(tm);
+        }
+        return MetadataResponse.prepareResponse(0,
+                cluster.nodes(),
+                cluster.clusterResource().clusterId(),
+                cluster.controller().id(),
+                metadata,
+                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+    }
+
     /**
      * Test that the client properly times out when we don't receive any metadata.
      */
@@ -1451,7 +1488,8 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             DescribeGroupsResponseData data = new DescribeGroupsResponseData();
             data.groups().add(DescribeGroupsResponse.groupMetadata(
@@ -1567,13 +1605,15 @@ public class KafkaAdminClientTest {
             assertNull(results.get());
 
             //should throw error for non-retriable errors
-            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
 
             final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
             TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
 
             //Retriable errors should be retried
-            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection();
             errorResponse1.add(new DeletableGroupResult()
@@ -1698,7 +1738,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             env.kafkaClient().prepareResponse(
-                FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
                 prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE));
@@ -1752,8 +1792,8 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             for (Errors error : retriableErrors) {
-                env.kafkaClient().prepareResponse(FindCoordinatorResponse
-                    .prepareResponse(Errors.NONE, env.cluster().controller()));
+                env.kafkaClient().prepareResponse(
+                    prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
                 env.kafkaClient().prepareResponse(
                     prepareOffsetDeleteResponse(error));
@@ -2022,8 +2062,8 @@ public class KafkaAdminClientTest {
                             .setPartitions(Collections.singletonList(normalPartitionResponse))));
             env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(responseData1));
             AlterPartitionReassignmentsResult result1 = env.adminClient().alterPartitionReassignments(reassignments);
-            Future future1 = result1.all();
-            Future future2 = result1.values().get(tp1);
+            Future<Void> future1 = result1.all();
+            Future<Void> future2 = result1.values().get(tp1);
             TestUtils.assertFutureError(future1, UnknownServerException.class);
             TestUtils.assertFutureError(future2, UnknownServerException.class);
 
@@ -2208,6 +2248,466 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testAlterConsumerGroupOffsets() throws Exception {
+        // Happy path
+
+        final Map<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        final String groupId = "group-0";
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+        final TopicPartition tp2 = new TopicPartition("bar", 0);
+        final TopicPartition tp3 = new TopicPartition("foobar", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            Map<TopicPartition, Errors> responseData = new HashMap<>();
+            responseData.put(tp1, Errors.NONE);
+            responseData.put(tp2, Errors.NONE);
+            env.kafkaClient().prepareResponse(new OffsetCommitResponse(0, responseData));
+
+            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+            offsets.put(tp1, new OffsetAndMetadata(123L));
+            offsets.put(tp2, new OffsetAndMetadata(456L));
+            final AlterConsumerGroupOffsetsResult result = env.adminClient().alterConsumerGroupOffsets(
+                groupId, offsets);
+
+            assertNull(result.all().get());
+            assertNull(result.partitionResult(tp1).get());
+            assertNull(result.partitionResult(tp2).get());
+            TestUtils.assertFutureError(result.partitionResult(tp3), IllegalArgumentException.class);
+        }
+    }
+
+    @Test
+    public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception {
+        // Retriable errors should be retried
+
+        final Map<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        final String groupId = "group-0";
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            env.kafkaClient().prepareResponse(
+                prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_NOT_AVAILABLE));
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            env.kafkaClient().prepareResponse(
+                prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS));
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            env.kafkaClient().prepareResponse(
+                prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR));
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            env.kafkaClient().prepareResponse(
+                prepareOffsetCommitResponse(tp1, Errors.NONE));
+
+            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+            offsets.put(tp1, new OffsetAndMetadata(123L));
+            final AlterConsumerGroupOffsetsResult result1 = env.adminClient()
+                .alterConsumerGroupOffsets(groupId, offsets);
+
+            assertNull(result1.all().get());
+            assertNull(result1.partitionResult(tp1).get());
+        }
+    }
+
+    @Test
+    public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws Exception {
+        // Non-retriable errors throw an exception
+
+        final Map<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        final String groupId = "group-0";
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+        final List<Errors> nonRetriableErrors = Arrays.asList(
+            Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            for (Errors error : nonRetriableErrors) {
+                env.kafkaClient().prepareResponse(
+                    prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+                env.kafkaClient().prepareResponse(prepareOffsetCommitResponse(tp1, error));
+
+                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+                offsets.put(tp1,  new OffsetAndMetadata(123L));
+                AlterConsumerGroupOffsetsResult errorResult = env.adminClient()
+                    .alterConsumerGroupOffsets(groupId, offsets);
+
+                TestUtils.assertFutureError(errorResult.all(), error.exception().getClass());
+                TestUtils.assertFutureError(errorResult.partitionResult(tp1), error.exception().getClass());
+            }
+        }
+    }
+
+    @Test
+    public void testAlterConsumerGroupOffsetsFindCoordinatorRetriableErrors() throws Exception {
+        // Retriable FindCoordinatorResponse errors should be retried
+
+        final Map<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        final String groupId = "group-0";
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            env.kafkaClient().prepareResponse(
+                prepareOffsetCommitResponse(tp1, Errors.NONE));
+
+            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+            offsets.put(tp1,  new OffsetAndMetadata(123L));
+            final AlterConsumerGroupOffsetsResult result = env.adminClient()
+                .alterConsumerGroupOffsets(groupId, offsets);
+
+            assertNull(result.all().get());
+            assertNull(result.partitionResult(tp1).get());
+        }
+    }
+
+    @Test
+    public void testAlterConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() throws Exception {
+        // Non-retriable FindCoordinatorResponse errors throw an exception
+
+        final Map<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        final String groupId = "group-0";
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
+
+            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+            offsets.put(tp1,  new OffsetAndMetadata(123L));
+            final AlterConsumerGroupOffsetsResult errorResult = env.adminClient()
+                .alterConsumerGroupOffsets(groupId, offsets);
+
+            TestUtils.assertFutureError(errorResult.all(), GroupAuthorizationException.class);
+            TestUtils.assertFutureError(errorResult.partitionResult(tp1), GroupAuthorizationException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsets() throws Exception {
+        // Happy path
+
+        Node node0 = new Node(0, "localhost", 8120);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                Arrays.asList(node0),
+                pInfos,
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(),
+                node0);
+
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+        final TopicPartition tp2 = new TopicPartition("bar", 0);
+        final TopicPartition tp3 = new TopicPartition("baz", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+            responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321)));
+            responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 234L, Optional.of(432)));
+            responseData.put(tp3, new PartitionData(Errors.NONE, 123456789L, 345L, Optional.of(543)));
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+
+            Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+            partitions.put(tp1, OffsetSpec.latest());
+            partitions.put(tp2, OffsetSpec.earliest());
+            partitions.put(tp3, OffsetSpec.forTimestamp(System.currentTimeMillis()));
+            ListOffsetsResult result = env.adminClient().listOffsets(partitions);
+
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
+            assertFalse(offsets.isEmpty());
+            assertEquals(123L, offsets.get(tp1).offset());
+            assertEquals(321, offsets.get(tp1).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp1).timestamp());
+            assertEquals(234L, offsets.get(tp2).offset());
+            assertEquals(432, offsets.get(tp2).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp2).timestamp());
+            assertEquals(345L, offsets.get(tp3).offset());
+            assertEquals(543, offsets.get(tp3).leaderEpoch().get().intValue());
+            assertEquals(123456789L, offsets.get(tp3).timestamp());
+            assertEquals(offsets.get(tp1), result.partitionResult(tp1).get());
+            assertEquals(offsets.get(tp2), result.partitionResult(tp2).get());
+            assertEquals(offsets.get(tp3), result.partitionResult(tp3).get());
+            try {
+                result.partitionResult(new TopicPartition("unknown", 0)).get();
+                fail("should have thrown IllegalArgumentException");
+            } catch (IllegalArgumentException expected) { }
+        }
+    }
+
+    @Test
+    public void testListOffsetsRetriableErrors() throws Exception {
+
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        List<Node> nodes = Arrays.asList(node0, node1);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1}));
+        pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0, node1}, new Node[]{node0, node1}));
+        pInfos.add(new PartitionInfo("bar", 0, node1, new Node[]{node1, node0}, new Node[]{node1, node0}));
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes,
+                pInfos,
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(),
+                node0);
+
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+        final TopicPartition tp2 = new TopicPartition("foo", 1);
+        final TopicPartition tp3 = new TopicPartition("bar", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            // listoffsets response from broker 0
+            Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+            responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -1L, 123L, Optional.of(321)));
+            responseData.put(tp3, new PartitionData(Errors.NONE, -1L, 987L, Optional.of(789)));
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+            // listoffsets response from broker 1
+            responseData = new HashMap<>();
+            responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 456L, Optional.of(654)));
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+
+            // metadata refresh because of LEADER_NOT_AVAILABLE
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            // listoffsets response from broker 0
+            responseData = new HashMap<>();
+            responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543)));
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+
+            Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+            partitions.put(tp1, OffsetSpec.latest());
+            partitions.put(tp2, OffsetSpec.latest());
+            partitions.put(tp3, OffsetSpec.latest());
+            ListOffsetsResult result = env.adminClient().listOffsets(partitions);
+
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
+            assertFalse(offsets.isEmpty());
+            assertEquals(345L, offsets.get(tp1).offset());
+            assertEquals(543, offsets.get(tp1).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp1).timestamp());
+            assertEquals(456, offsets.get(tp2).offset());
+            assertEquals(654, offsets.get(tp2).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp2).timestamp());
+            assertEquals(987, offsets.get(tp3).offset());
+            assertEquals(789, offsets.get(tp3).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp3).timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsNonRetriableErrors() throws Exception {
+
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        List<Node> nodes = Arrays.asList(node0, node1);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1}));
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes,
+                pInfos,
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(),
+                node0);
+
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+            responseData.put(tp1, new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1, Optional.empty()));
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+
+            Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+            partitions.put(tp1, OffsetSpec.latest());
+            ListOffsetsResult result = env.adminClient().listOffsets(partitions);
+
+            TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMetadataRetriableErrors() throws Exception {
+
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        List<Node> nodes = Arrays.asList(node0, node1);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        pInfos.add(new PartitionInfo("foo", 1, node1, new Node[]{node1}, new Node[]{node1}));
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes,
+                pInfos,
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(),
+                node0);
+
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.LEADER_NOT_AVAILABLE));
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION));
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+            responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543)));
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+            // listoffsets response from broker 1
+            responseData = new HashMap<>();
+            responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 789L, Optional.of(987)));
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+
+            Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+            partitions.put(tp0, OffsetSpec.latest());
+            partitions.put(tp1, OffsetSpec.latest());
+            ListOffsetsResult result = env.adminClient().listOffsets(partitions);
+
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
+            assertFalse(offsets.isEmpty());
+            assertEquals(345L, offsets.get(tp0).offset());
+            assertEquals(543, offsets.get(tp0).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp0).timestamp());
+            assertEquals(789L, offsets.get(tp1).offset());
+            assertEquals(987, offsets.get(tp1).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp1).timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsMetadataNonRetriableErrors() throws Exception {
+
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        List<Node> nodes = Arrays.asList(node0, node1);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1}));
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes,
+                pInfos,
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(),
+                node0);
+
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.TOPIC_AUTHORIZATION_FAILED));
+
+            Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+            partitions.put(tp1, OffsetSpec.latest());
+            ListOffsetsResult result = env.adminClient().listOffsets(partitions);
+
+            TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class);
+        }
+    }
+
     private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
                                                                  MemberAssignment assignment) {
         return new MemberDescription(member.memberId(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 017e447..3ec3b30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.TopicPartitionReplica;
@@ -443,6 +444,16 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
+    public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options) {
+        throw new UnsupportedOperationException("Not implement yet");
+    }
+
+    @Override
+    public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
+        throw new UnsupportedOperationException("Not implement yet");
+    }
+
+    @Override
     public void close(Duration timeout) {}
 
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 68fac2b..79477b7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
@@ -61,7 +62,6 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
-import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fd1be37..ae02241 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
@@ -70,7 +71,6 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.EpochEndOffset;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c8bd751..c8563e7 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2d17659..139335a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -27,9 +27,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
 import kafka.utils._
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.clients.{CommonClientConfigs, admin}
-import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 
@@ -42,6 +41,7 @@ import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.immutable.TreeMap
 import scala.reflect.ClassTag
+import org.apache.kafka.common.requests.ListOffsetResponse
 
 object ConsumerGroupCommand extends Logging {
 
@@ -169,9 +169,6 @@ object ConsumerGroupCommand extends Logging {
 
     private val adminClient = createAdminClient(configOverrides)
 
-    // `consumers` are only needed for `describe`, so we instantiate them lazily
-    private lazy val consumers: mutable.Map[String, KafkaConsumer[String, String]] = mutable.Map.empty
-
     // We have to make sure it is evaluated once and available
     private lazy val resetPlanFromFile: Option[Map[String, Map[TopicPartition, OffsetAndMetadata]]] = {
       if (opts.options.has(opts.resetFromFileOpt)) {
@@ -390,8 +387,13 @@ object ConsumerGroupCommand extends Logging {
 
                 // Dry-run is the default behavior if --execute is not specified
                 val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt)
-                if (!dryRun)
-                  getConsumer(groupId).commitSync(preparedOffsets.asJava)
+                if (!dryRun) {
+                  adminClient.alterConsumerGroupOffsets(
+                    groupId,
+                    preparedOffsets.asJava,
+                    withTimeoutMs(new AlterConsumerGroupOffsetsOptions)
+                  ).all.get
+                }
                 acc.updated(groupId, preparedOffsets)
               case currentState =>
                 printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
@@ -574,34 +576,50 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def getLogEndOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
-      val offsets = getConsumer(groupId).endOffsets(topicPartitions.asJava)
+      val endOffsets = topicPartitions.map { topicPartition =>
+        topicPartition -> OffsetSpec.latest
+      }.toMap
+      val offsets = adminClient.listOffsets(
+        endOffsets.asJava,
+        withTimeoutMs(new ListOffsetsOptions)
+      ).all.get
       topicPartitions.map { topicPartition =>
         Option(offsets.get(topicPartition)) match {
-          case Some(logEndOffset) => topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
+          case Some(listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)
           case _ => topicPartition -> LogOffsetResult.Unknown
         }
       }.toMap
     }
 
     private def getLogStartOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
-      val offsets = getConsumer(groupId).beginningOffsets(topicPartitions.asJava)
+      val startOffsets = topicPartitions.map { topicPartition =>
+        topicPartition -> OffsetSpec.earliest
+      }.toMap
+      val offsets = adminClient.listOffsets(
+        startOffsets.asJava,
+        withTimeoutMs(new ListOffsetsOptions)
+      ).all.get
       topicPartitions.map { topicPartition =>
         Option(offsets.get(topicPartition)) match {
-          case Some(logStartOffset) => topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
+          case Some(listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)
           case _ => topicPartition -> LogOffsetResult.Unknown
         }
       }.toMap
     }
 
     private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {
-      val consumer = getConsumer(groupId)
-      consumer.assign(topicPartitions.asJava)
-
+      val timestampOffsets = topicPartitions.map { topicPartition =>
+        topicPartition -> OffsetSpec.forTimestamp(timestamp)
+      }.toMap
+      val offsets = adminClient.listOffsets(
+        timestampOffsets.asJava,
+        withTimeoutMs(new ListOffsetsOptions)
+      ).all.get
       val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
-        consumer.offsetsForTimes(topicPartitions.map(_ -> timestamp).toMap.asJava).asScala.partition(_._2 != null)
+        offsets.asScala.partition(_._2.offset != ListOffsetResponse.UNKNOWN_OFFSET)
 
       val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
-        case (topicPartition, offsetAndTimestamp) => topicPartition -> LogOffsetResult.LogOffset(offsetAndTimestamp.offset)
+        case (topicPartition, listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)
       }.toMap
 
       successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)
@@ -609,9 +627,6 @@ object ConsumerGroupCommand extends Logging {
 
     def close(): Unit = {
       adminClient.close()
-      consumers.values.foreach(consumer =>
-        Option(consumer).foreach(_.close())
-      )
     }
 
     private def createAdminClient(configOverrides: Map[String, String]): Admin = {
@@ -621,32 +636,6 @@ object ConsumerGroupCommand extends Logging {
       admin.AdminClient.create(props)
     }
 
-    private def getConsumer(groupId: String) = {
-      if (consumers.get(groupId).isEmpty)
-        consumers.update(groupId, createConsumer(groupId))
-      consumers(groupId)
-    }
-
-    private def createConsumer(groupId: String): KafkaConsumer[String, String] = {
-      val properties = new Properties()
-      val deserializer = (new StringDeserializer).getClass.getName
-      val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
-      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
-      properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-      properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-      properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
-      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
-      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
-
-      if (opts.options.has(opts.commandConfigOpt)) {
-        Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)).asScala.foreach {
-          case (k,v) => properties.put(k, v)
-        }
-      }
-
-      new KafkaConsumer(properties)
-    }
-
     private def withTimeoutMs [T <: AbstractOptions[T]] (options : T) =  {
       val t = opts.options.valueOf(opts.timeoutMsOpt).intValue()
       options.timeoutMs(t)
@@ -657,8 +646,17 @@ object ConsumerGroupCommand extends Logging {
         val topicPartitions = topicArg.split(":")
         val topic = topicPartitions(0)
         topicPartitions(1).split(",").map(partition => new TopicPartition(topic, partition.toInt))
-      case topic => getConsumer(groupId).partitionsFor(topic).asScala
-        .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition))
+      case topic =>
+        val descriptionMap = adminClient.describeTopics(
+          Seq(topic).asJava,
+          withTimeoutMs(new DescribeTopicsOptions)
+        ).all().get.asScala
+        val r = descriptionMap.flatMap{ case(topic, description) =>
+          description.partitions().asScala.map{ tpInfo =>
+            new TopicPartition(topic, tpInfo.partition)
+          }
+        }
+        r
     }
 
     private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 43bc042..2089e5c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -30,7 +30,7 @@ import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.protocol.Errors
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3995bec..4e7ceda 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -35,7 +35,7 @@ import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{ElectionType, Node, TopicPartition}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 38fe165..38073dc 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig, Al
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.{ElectionType, IsolationLevel}
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index eb50e96..f57dd85 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -28,14 +28,14 @@ import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
 import kafka.server._
 import kafka.utils._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException}
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.utils.SystemTime
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, ListOffsetRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
 import org.junit.Test
 import org.junit.Assert._
 import org.mockito.Mockito._
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 895d9e5..07acfde 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -25,10 +25,10 @@ import kafka.log.LogConfig
 import kafka.message.{GZIPCompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
 import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
 import org.junit.Assert._
 import org.junit.Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6f30c11..8523491 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -33,7 +33,7 @@ import kafka.network.RequestChannel.SendResponse
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 9c97c1a..a298d84 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -19,9 +19,9 @@ package kafka.server
 import java.util.Optional
 
 import kafka.utils.TestUtils
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest, ListOffsetResponse}
+import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse}
 import org.junit.Assert._
 import org.junit.Test
 
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index d88540d..85d2ded 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -24,10 +24,10 @@ import java.util.{Optional, Properties, Random}
 import kafka.log.{Log, LogSegment}
 import kafka.network.SocketServer
 import kafka.utils.{MockTime, TestUtils}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, ListOffsetRequest, ListOffsetResponse}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9aaae92..01d6bf0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index f34eae4..6b1d3c1 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -22,7 +22,7 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.authorizer.AclAuthorizer
 import kafka.utils.TestUtils
-import org.apache.kafka.common.{ElectionType, Node, TopicPartition}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2aecdc2..cdec817 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -50,10 +50,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
 
 /**
  * Configuration for a {@link KafkaStreams} instance.
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 1d31377..27b574a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -44,8 +44,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
-import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
+import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
+import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
 import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 15d6789..c611dc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 0c9889a..255afc8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 5daf066..e98d152 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -27,11 +27,11 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;


Mime
View raw message