kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8992; Redefine RemoveMembersFromGroup interface on AdminClient (#7478)
Date Fri, 25 Oct 2019 07:30:49 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 802c060  KAFKA-8992; Redefine RemoveMembersFromGroup interface on AdminClient  (#7478)
802c060 is described below

commit 802c060d5c00843e7e3a99f5ec48296899afa1cf
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Fri Oct 25 00:01:52 2019 -0700

    KAFKA-8992; Redefine RemoveMembersFromGroup interface on AdminClient  (#7478)
    
    This PR fixes the inconsistency involved in the `removeMembersFromGroup` admin API calls:
    
    1. Fail the `all()` request when there is sub level error (either partition or member)
    2. Change getMembers() to members()
    3. Hide the actual Errors from user
    4. Do not expose generated MemberIdentity type
    5. Use more consistent naming for Options and Result types
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |   2 +-
 .../admin/DeleteConsumerGroupOffsetsResult.java    |  65 +++++----
 .../kafka/clients/admin/KafkaAdminClient.java      |  81 ++++++-----
 .../apache/kafka/clients/admin/MemberToRemove.java |  58 ++++++++
 .../RemoveMemberFromConsumerGroupOptions.java      |  50 -------
 .../clients/admin/RemoveMemberFromGroupResult.java |  85 ------------
 ... => RemoveMembersFromConsumerGroupOptions.java} |  28 ++--
 .../RemoveMembersFromConsumerGroupResult.java      |  96 +++++++++++++
 .../kafka/common/requests/LeaveGroupRequest.java   |   6 +-
 .../DeleteConsumerGroupOffsetsResultTest.java      | 118 ++++++++++++++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 142 ++++++++++---------
 .../clients/admin/MembershipChangeResultTest.java  |  50 -------
 .../kafka/clients/admin/MockAdminClient.java       |   2 +-
 .../admin/RemoveMemberFromGroupResultTest.java     | 154 ---------------------
 ...RemoveMembersFromConsumerGroupOptionsTest.java} |  13 +-
 .../RemoveMembersFromConsumerGroupResultTest.java  | 119 ++++++++++++++++
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  68 +++++----
 .../kafka/api/AdminClientIntegrationTest.scala     |  61 +++-----
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 ...ffsetsConsumerGroupCommandIntegrationTest.scala |  18 ++-
 20 files changed, 639 insertions(+), 579 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 915bd72..a0e4365 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1078,7 +1078,7 @@ public interface Admin extends AutoCloseable {
      * @param options The options to carry removing members' information.
      * @return The MembershipChangeResult.
      */
-    MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options);
+    RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options);
 
     /**
      * Get the metrics kept by the adminClient
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java
index 433f478..336e9c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java
@@ -18,8 +18,6 @@ package org.apache.kafka.clients.admin;
 
 import java.util.Set;
 import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.KafkaFuture.BaseFunction;
-import org.apache.kafka.common.KafkaFuture.BiConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
@@ -35,50 +33,65 @@ import org.apache.kafka.common.protocol.Errors;
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupOffsetsResult {
     private final KafkaFuture<Map<TopicPartition, Errors>> future;
+    private final Set<TopicPartition> partitions;
 
-    DeleteConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
+
+    DeleteConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) {
         this.future = future;
+        this.partitions = partitions;
     }
 
     /**
      * Return a future which can be used to check the result for a given partition.
      */
     public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
+        if (!partitions.contains(partition)) {
+            throw new IllegalArgumentException("Partition " + partition + " was not included in the original request");
+        }
         final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
 
-        this.future.whenComplete(new BiConsumer<Map<TopicPartition, Errors>, Throwable>() {
-            @Override
-            public void accept(final Map<TopicPartition, Errors> topicPartitions, final Throwable throwable) {
-                if (throwable != null) {
-                    result.completeExceptionally(throwable);
-                } else if (!topicPartitions.containsKey(partition)) {
-                    result.completeExceptionally(new IllegalArgumentException(
-                        "Group offset deletion for partition \"" + partition +
-                        "\" was not attempted"));
-                } else {
-                    final Errors error = topicPartitions.get(partition);
-                    if (error == Errors.NONE) {
-                        result.complete(null);
-                    } else {
-                        result.completeExceptionally(error.exception());
-                    }
-                }
-
+        this.future.whenComplete((topicPartitions, throwable) -> {
+            if (throwable != null) {
+                result.completeExceptionally(throwable);
+            } else if (!maybeCompleteExceptionally(topicPartitions, partition, result)) {
+                result.complete(null);
             }
         });
-
         return result;
     }
 
     /**
      * Return a future which succeeds only if all the deletions succeed.
+     * If not, the first partition error shall be returned.
      */
     public KafkaFuture<Void> all() {
-        return this.future.thenApply(new BaseFunction<Map<TopicPartition, Errors>, Void>() {
-            @Override
-            public Void apply(final Map<TopicPartition, Errors> topicPartitionErrorsMap) {
-                return null;
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+        this.future.whenComplete((topicPartitions, throwable) -> {
+            if (throwable != null) {
+                result.completeExceptionally(throwable);
+            } else {
+                for (TopicPartition partition : partitions) {
+                    if (maybeCompleteExceptionally(topicPartitions, partition, result)) {
+                        return;
+                    }
+                }
+                result.complete(null);
             }
         });
+        return result;
+    }
+
+    private boolean maybeCompleteExceptionally(Map<TopicPartition, Errors> partitionLevelErrors,
+                                               TopicPartition partition,
+                                               KafkaFutureImpl<Void> result) {
+        Throwable exception = KafkaAdminClient.getSubLevelError(partitionLevelErrors, partition,
+            "Offset deletion result for partition \"" + partition + "\" was not included in the response");
+        if (exception != null) {
+            result.completeExceptionally(exception);
+            return true;
+        } else {
+            return false;
+        }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 79822a6..be4dd9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -83,6 +83,8 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
@@ -149,6 +151,7 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
@@ -2686,7 +2689,7 @@ public class KafkaAdminClient extends AdminClient {
             ConsumerGroupOperationContext<ConsumerGroupDescription, DescribeConsumerGroupsOptions> context =
                     new ConsumerGroupOperationContext<>(groupId, options, deadline, futures.get(groupId));
             Call findCoordinatorCall = getFindCoordinatorCall(context,
-                () -> KafkaAdminClient.this.getDescribeConsumerGroupsCall(context));
+                () -> getDescribeConsumerGroupsCall(context));
             runnable.call(findCoordinatorCall, startFindCoordinatorMs);
         }
 
@@ -2973,7 +2976,7 @@ public class KafkaAdminClient extends AdminClient {
                 new ConsumerGroupOperationContext<>(groupId, options, deadline, groupOffsetListingFuture);
 
         Call findCoordinatorCall = getFindCoordinatorCall(context,
-            () -> KafkaAdminClient.this.getListConsumerGroupOffsetsCall(context));
+            () -> getListConsumerGroupOffsetsCall(context));
         runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
         return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
@@ -3045,7 +3048,7 @@ public class KafkaAdminClient extends AdminClient {
             ConsumerGroupOperationContext<Void, DeleteConsumerGroupsOptions> context =
                     new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
             Call findCoordinatorCall = getFindCoordinatorCall(context,
-                () -> KafkaAdminClient.this.getDeleteConsumerGroupsCall(context));
+                () -> getDeleteConsumerGroupsCall(context));
             runnable.call(findCoordinatorCall, startFindCoordinatorMs);
         }
 
@@ -3097,7 +3100,7 @@ public class KafkaAdminClient extends AdminClient {
         if (groupIdIsUnrepresentable(groupId)) {
             future.completeExceptionally(new InvalidGroupIdException("The given group id '" +
                 groupId + "' cannot be represented in a request."));
-            return new DeleteConsumerGroupOffsetsResult(future);
+            return new DeleteConsumerGroupOffsetsResult(future, partitions);
         }
 
         final long startFindCoordinatorMs = time.milliseconds();
@@ -3106,10 +3109,10 @@ public class KafkaAdminClient extends AdminClient {
             new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
 
         Call findCoordinatorCall = getFindCoordinatorCall(context,
-            () -> KafkaAdminClient.this.getDeleteConsumerGroupOffsetsCall(context, partitions));
+            () -> getDeleteConsumerGroupOffsetsCall(context, partitions));
         runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
-        return new DeleteConsumerGroupOffsetsResult(future);
+        return new DeleteConsumerGroupOffsetsResult(future, partitions);
     }
 
     private Call getDeleteConsumerGroupOffsetsCall(
@@ -3155,13 +3158,10 @@ public class KafkaAdminClient extends AdminClient {
                     return;
 
                 final Map<TopicPartition, Errors> partitions = new HashMap<>();
-                response.data.topics().forEach(topic -> {
-                    topic.partitions().forEach(partition -> {
-                        partitions.put(
-                            new TopicPartition(topic.name(), partition.partitionIndex()),
-                            Errors.forCode(partition.errorCode()));
-                    });
-                });
+                response.data.topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
+                    new TopicPartition(topic.name(), partition.partitionIndex()),
+                    Errors.forCode(partition.errorCode())))
+                );
 
                 context.getFuture().complete(partitions);
             }
@@ -3461,33 +3461,32 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public MembershipChangeResult removeMemberFromConsumerGroup(String groupId,
-                                                                RemoveMemberFromConsumerGroupOptions options) {
+    public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId,
+                                                                               RemoveMembersFromConsumerGroupOptions options) {
         final long startFindCoordinatorMs = time.milliseconds();
         final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
 
-        KafkaFutureImpl<RemoveMemberFromGroupResult> future = new KafkaFutureImpl<>();
+        KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new KafkaFutureImpl<>();
 
-        ConsumerGroupOperationContext<RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context =
+        ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context =
             new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
 
         Call findCoordinatorCall = getFindCoordinatorCall(context,
-            () -> KafkaAdminClient.this.getRemoveMembersFromGroupCall(context));
+            () -> getRemoveMembersFromGroupCall(context));
         runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
-        return new MembershipChangeResult(future);
+        return new RemoveMembersFromConsumerGroupResult(future, options.members());
     }
 
-
-    private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext
-                                                   <RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context) {
+    private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context) {
         return new Call("leaveGroup",
                         context.getDeadline(),
                         new ConstantNodeIdProvider(context.getNode().get().id())) {
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
+            LeaveGroupRequest.Builder createRequest(int timeoutMs) {
                 return new LeaveGroupRequest.Builder(context.getGroupId(),
-                                                     context.getOptions().getMembers());
+                                                     context.getOptions().members().stream().map(
+                                                         MemberToRemove::toMemberIdentity).collect(Collectors.toList()));
             }
 
             @Override
@@ -3500,16 +3499,19 @@ public class KafkaAdminClient extends AdminClient {
                     return;
                 }
 
-                // If error is transient coordinator error, retry
-                Errors error = response.error();
-                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
-                    throw error.exception();
-                }
-
-                final RemoveMemberFromGroupResult membershipChangeResult =
-                    new RemoveMemberFromGroupResult(response, context.getOptions().getMembers());
+                if (handleGroupRequestError(response.topLevelError(), context.getFuture()))
+                    return;
 
-                context.getFuture().complete(membershipChangeResult);
+                final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
+                for (MemberResponse memberResponse : response.memberResponses()) {
+                    // We set member.id to empty here explicitly, so that the lookup will succeed as user doesn't
+                    // know the exact member.id.
+                    memberErrors.put(new MemberIdentity()
+                                         .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+                                         .setGroupInstanceId(memberResponse.groupInstanceId()),
+                                     Errors.forCode(memberResponse.errorCode()));
+                }
+                context.getFuture().complete(memberErrors);
             }
 
             @Override
@@ -3518,4 +3520,17 @@ public class KafkaAdminClient extends AdminClient {
             }
         };
     }
+
+    /**
+     * Get a sub level error when the request is in batch. If given key was not found,
+     * return an {@link IllegalArgumentException}.
+     */
+    static <K> Throwable getSubLevelError(Map<K, Errors> subLevelErrors, K subKey, String keyNotFoundMsg) {
+        if (!subLevelErrors.containsKey(subKey)) {
+            return new IllegalArgumentException(keyNotFoundMsg);
+        } else {
+            return subLevelErrors.get(subKey).exception();
+        }
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java
new file mode 100644
index 0000000..4c7b16b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+
+import java.util.Objects;
+
+/**
+ * A struct containing information about the member to be removed.
+ */
+public class MemberToRemove {
+    private final String groupInstanceId;
+
+    public MemberToRemove(String groupInstanceId) {
+        this.groupInstanceId = groupInstanceId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof MemberToRemove) {
+            MemberToRemove otherMember = (MemberToRemove) o;
+            return this.groupInstanceId.equals(otherMember.groupInstanceId);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(groupInstanceId);
+    }
+
+    MemberIdentity toMemberIdentity() {
+        return new MemberIdentity()
+                   .setGroupInstanceId(groupInstanceId)
+                   .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID);
+    }
+
+    public String groupInstanceId() {
+        return groupInstanceId;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java
deleted file mode 100644
index ed1fdab..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
-import org.apache.kafka.common.requests.JoinGroupRequest;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Options for {@link AdminClient#removeMemberFromConsumerGroup(String, RemoveMemberFromConsumerGroupOptions)}.
- * It carries the members to be removed from the consumer group.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
- */
-@InterfaceStability.Evolving
-public class RemoveMemberFromConsumerGroupOptions extends AbstractOptions<RemoveMemberFromConsumerGroupOptions> {
-
-    private List<MemberIdentity> members;
-
-    public RemoveMemberFromConsumerGroupOptions(Collection<String> groupInstanceIds) {
-        members = groupInstanceIds.stream().map(
-            instanceId -> new MemberIdentity()
-                              .setGroupInstanceId(instanceId)
-                              .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
-        ).collect(Collectors.toList());
-    }
-
-    public List<MemberIdentity> getMembers() {
-        return members;
-    }
-}
-
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java
deleted file mode 100644
index 1bd9a8b..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
-import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.LeaveGroupResponse;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Result of a batch member removal operation.
- */
-public class RemoveMemberFromGroupResult {
-
-    private final Errors topLevelError;
-    private final Map<MemberIdentity, KafkaFuture<Void>> memberFutures;
-    private boolean hasError = false;
-
-    RemoveMemberFromGroupResult(LeaveGroupResponse response,
-                                List<MemberIdentity> membersToRemove) {
-        this.topLevelError = response.topLevelError();
-        this.memberFutures = new HashMap<>(membersToRemove.size());
-
-        if (this.topLevelError != Errors.NONE) {
-            // If the populated error is a top-level error, fail every member's future.
-            for (MemberIdentity memberIdentity : membersToRemove) {
-                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
-                future.completeExceptionally(topLevelError.exception());
-                memberFutures.put(memberIdentity, future);
-            }
-            hasError = true;
-        } else {
-            for (MemberResponse memberResponse : response.memberResponses()) {
-                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
-                Errors memberError = Errors.forCode(memberResponse.errorCode());
-                if (memberError != Errors.NONE) {
-                    future.completeExceptionally(memberError.exception());
-                    hasError = true;
-                } else {
-                    future.complete(null);
-                }
-                memberFutures.put(new MemberIdentity()
-                                      .setMemberId(memberResponse.memberId())
-                                      .setGroupInstanceId(memberResponse.groupInstanceId()), future);
-            }
-        }
-    }
-
-    public Errors topLevelError() {
-        return topLevelError;
-    }
-
-    public boolean hasError() {
-        return hasError;
-    }
-
-    /**
-     * Futures of members with corresponding errors when they leave the group.
-     *
-     * @return list of members who failed to be removed
-     */
-    public Map<MemberIdentity, KafkaFuture<Void>> memberFutures() {
-        return memberFutures;
-    }
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
similarity index 57%
rename from clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java
rename to clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
index e704ed9..dc346f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
@@ -16,34 +16,28 @@
  */
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.concurrent.ExecutionException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
- * The result of the {@link KafkaAdminClient#removeMemberFromConsumerGroup(String, RemoveMemberFromConsumerGroupOptions)} call.
+ * Options for {@link AdminClient#removeMembersFromConsumerGroup(String, RemoveMembersFromConsumerGroupOptions)}.
+ * It carries the members to be removed from the consumer group.
  *
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class MembershipChangeResult {
+public class RemoveMembersFromConsumerGroupOptions extends AbstractOptions<RemoveMembersFromConsumerGroupOptions> {
 
-    private KafkaFuture<RemoveMemberFromGroupResult> future;
+    private Set<MemberToRemove> members;
 
-    MembershipChangeResult(KafkaFuture<RemoveMemberFromGroupResult> future) {
-        this.future = future;
+    public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members) {
+        this.members = new HashSet<>(members);
     }
 
-    /**
-     * Return a future which contains the member removal results.
-     */
-    public RemoveMemberFromGroupResult all() throws ExecutionException, InterruptedException {
-        return future.get();
-    }
-
-    // Visible for testing
-    public KafkaFuture<RemoveMemberFromGroupResult> future() {
-        return future;
+    public Set<MemberToRemove> members() {
+        return members;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
new file mode 100644
index 0000000..405973b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of the {@link Admin#removeMembersFromConsumerGroup(String, RemoveMembersFromConsumerGroupOptions)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class RemoveMembersFromConsumerGroupResult {
+
+    private final KafkaFuture<Map<MemberIdentity, Errors>> future;
+    private final Set<MemberToRemove> memberInfos;
+
+    RemoveMembersFromConsumerGroupResult(KafkaFuture<Map<MemberIdentity, Errors>> future,
+                                         Set<MemberToRemove> memberInfos) {
+        this.future = future;
+        this.memberInfos = memberInfos;
+    }
+
+    /**
+     * Returns a future which indicates whether the request was 100% success, i.e. no
+     * either top level or member level error.
+     * If not, the first member error shall be returned.
+     */
+    public KafkaFuture<Void> all() {
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+        this.future.whenComplete((memberErrors, throwable) -> {
+            if (throwable != null) {
+                result.completeExceptionally(throwable);
+            } else {
+                for (MemberToRemove memberToRemove : memberInfos) {
+                    if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) {
+                        return;
+                    }
+                }
+                result.complete(null);
+            }
+        });
+        return result;
+    }
+
+    /**
+     * Returns the selected member future.
+     */
+    public KafkaFuture<Void> memberResult(MemberToRemove member) {
+        if (!memberInfos.contains(member)) {
+            throw new IllegalArgumentException("Member " + member + " was not included in the original request");
+        }
+
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+        this.future.whenComplete((memberErrors, throwable) -> {
+            if (throwable != null) {
+                result.completeExceptionally(throwable);
+            } else if (!maybeCompleteExceptionally(memberErrors, member.toMemberIdentity(), result)) {
+                result.complete(null);
+            }
+        });
+        return result;
+    }
+
+    private boolean maybeCompleteExceptionally(Map<MemberIdentity, Errors> memberErrors,
+                                               MemberIdentity member,
+                                               KafkaFutureImpl<Void> result) {
+        Throwable exception = KafkaAdminClient.getSubLevelError(memberErrors, member,
+            "Member \"" + member + "\" was not included in the removal response");
+        if (exception != null) {
+            result.completeExceptionally(exception);
+            return true;
+        } else {
+            return false;
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index ac77379..4cd72a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -36,7 +36,11 @@ public class LeaveGroupRequest extends AbstractRequest {
         private final List<MemberIdentity> members;
 
         public Builder(String groupId, List<MemberIdentity> members) {
-            super(ApiKeys.LEAVE_GROUP);
+            this(groupId, members, ApiKeys.LEAVE_GROUP.oldestVersion(), ApiKeys.LEAVE_GROUP.latestVersion());
+        }
+
+        Builder(String groupId, List<MemberIdentity> members, short oldestVersion, short latestVersion) {
+            super(ApiKeys.LEAVE_GROUP, oldestVersion, latestVersion);
             this.groupId = groupId;
             this.members = members;
             if (members.isEmpty()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java
new file mode 100644
index 0000000..19ce76d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class DeleteConsumerGroupOffsetsResultTest {
+
+    private final String topic = "topic";
+    private final TopicPartition tpZero = new TopicPartition(topic, 0);
+    private final TopicPartition tpOne = new TopicPartition(topic, 1);
+    private Set<TopicPartition> partitions;
+    private Map<TopicPartition, Errors> errorsMap;
+
+    private KafkaFutureImpl<Map<TopicPartition, Errors>> partitionFutures;
+
+    @Before
+    public void setUp() {
+        partitionFutures = new KafkaFutureImpl<>();
+        partitions = new HashSet<>();
+        partitions.add(tpZero);
+        partitions.add(tpOne);
+
+        errorsMap = new HashMap<>();
+        errorsMap.put(tpZero, Errors.NONE);
+        errorsMap.put(tpOne, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    }
+
+    @Test
+    public void testTopLevelErrorConstructor() throws InterruptedException {
+        partitionFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception());
+        DeleteConsumerGroupOffsetsResult topLevelErrorResult =
+            new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
+        TestUtils.assertFutureError(topLevelErrorResult.all(), GroupAuthorizationException.class);
+    }
+
+    @Test
+    public void testPartitionLevelErrorConstructor() throws ExecutionException, InterruptedException {
+        createAndVerifyPartitionLevelErrror();
+    }
+
+    @Test
+    public void testPartitionMissingInResponseErrorConstructor() throws InterruptedException, ExecutionException {
+        errorsMap.remove(tpOne);
+        partitionFutures.complete(errorsMap);
+        assertFalse(partitionFutures.isCompletedExceptionally());
+        DeleteConsumerGroupOffsetsResult missingPartitionResult =
+            new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
+
+        TestUtils.assertFutureError(missingPartitionResult.all(), IllegalArgumentException.class);
+        assertNull(missingPartitionResult.partitionResult(tpZero).get());
+        TestUtils.assertFutureError(missingPartitionResult.partitionResult(tpOne), IllegalArgumentException.class);
+    }
+
+    @Test
+    public void testPartitionMissingInRequestErrorConstructor() throws InterruptedException, ExecutionException {
+        DeleteConsumerGroupOffsetsResult partitionLevelErrorResult = createAndVerifyPartitionLevelErrror();
+        assertThrows(IllegalArgumentException.class, () -> partitionLevelErrorResult.partitionResult(new TopicPartition("invalid-topic", 0)));
+    }
+
+    @Test
+    public void testNoErrorConstructor() throws ExecutionException, InterruptedException {
+        Map<TopicPartition, Errors> errorsMap = new HashMap<>();
+        errorsMap.put(tpZero, Errors.NONE);
+        errorsMap.put(tpOne, Errors.NONE);
+        DeleteConsumerGroupOffsetsResult noErrorResult =
+            new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
+        partitionFutures.complete(errorsMap);
+
+        assertNull(noErrorResult.all().get());
+        assertNull(noErrorResult.partitionResult(tpZero).get());
+        assertNull(noErrorResult.partitionResult(tpOne).get());
+    }
+
+    private DeleteConsumerGroupOffsetsResult createAndVerifyPartitionLevelErrror() throws InterruptedException, ExecutionException {
+        partitionFutures.complete(errorsMap);
+        assertFalse(partitionFutures.isCompletedExceptionally());
+        DeleteConsumerGroupOffsetsResult partitionLevelErrorResult =
+            new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
+
+        TestUtils.assertFutureError(partitionLevelErrorResult.all(), UnknownTopicOrPartitionException.class);
+        assertNull(partitionLevelErrorResult.partitionResult(tpZero).get());
+        TestUtils.assertFutureError(partitionLevelErrorResult.partitionResult(tpOne), UnknownTopicOrPartitionException.class);
+        return partitionLevelErrorResult;
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cf50c8f..9fd0c1c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -152,6 +153,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1669,10 +1671,10 @@ public class KafkaAdminClientTest {
             final DeleteConsumerGroupOffsetsResult errorResult = env.adminClient().deleteConsumerGroupOffsets(
                 groupId, Stream.of(tp1, tp2).collect(Collectors.toSet()));
 
-            assertNull(errorResult.all().get());
             assertNull(errorResult.partitionResult(tp1).get());
+            TestUtils.assertFutureError(errorResult.all(), GroupSubscribedToTopicException.class);
             TestUtils.assertFutureError(errorResult.partitionResult(tp2), GroupSubscribedToTopicException.class);
-            TestUtils.assertFutureError(errorResult.partitionResult(tp3), IllegalArgumentException.class);
+            assertThrows(IllegalArgumentException.class, () -> errorResult.partitionResult(tp3));
         }
     }
 
@@ -1745,15 +1747,15 @@ public class KafkaAdminClientTest {
 
         final String groupId = "group-0";
         final TopicPartition tp1 = new TopicPartition("foo", 0);
-        final List<Errors> retriableErrors = Arrays.asList(
+        final List<Errors> nonRetriableErrors = Arrays.asList(
             Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND);
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            for (Errors error : retriableErrors) {
-                env.kafkaClient().prepareResponse(FindCoordinatorResponse
-                    .prepareResponse(Errors.NONE, env.cluster().controller()));
+            for (Errors error : nonRetriableErrors) {
+                env.kafkaClient().prepareResponse(
+                    prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
                 env.kafkaClient().prepareResponse(
                     prepareOffsetDeleteResponse(error));
@@ -1899,16 +1901,6 @@ public class KafkaAdminClientTest {
             final String instanceTwo = "instance-2";
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            MemberResponse responseOne = new MemberResponse()
-                                             .setGroupInstanceId(instanceOne)
-                                             .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
-
-            MemberResponse responseTwo = new MemberResponse()
-                                             .setGroupInstanceId(instanceTwo)
-                                             .setErrorCode(Errors.NONE.code());
-
-            List<MemberResponse> memberResponses = Arrays.asList(responseOne, responseTwo);
-
             // Retriable FindCoordinatorResponse errors should be retried
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,  Node.noNode()));
@@ -1926,79 +1918,76 @@ public class KafkaAdminClientTest {
                                                                          .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())));
 
             String groupId = "groupId";
-            List<String> membersToRemove = Arrays.asList(instanceOne, instanceTwo);
-            final MembershipChangeResult unknownErrorResult = env.adminClient().removeMemberFromConsumerGroup(
+            Collection<MemberToRemove> membersToRemove = Arrays.asList(new MemberToRemove(instanceOne),
+                                                                       new MemberToRemove(instanceTwo));
+            final RemoveMembersFromConsumerGroupResult unknownErrorResult = env.adminClient().removeMembersFromConsumerGroup(
                 groupId,
-                new RemoveMemberFromConsumerGroupOptions(membersToRemove)
+                new RemoveMembersFromConsumerGroupOptions(membersToRemove)
             );
 
-            RemoveMemberFromGroupResult result = unknownErrorResult.all();
-            assertTrue(result.hasError());
-            assertEquals(Errors.UNKNOWN_SERVER_ERROR, result.topLevelError());
-
-            Map<MemberIdentity, KafkaFuture<Void>> memberFutures = result.memberFutures();
-            assertEquals(2, memberFutures.size());
-            for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
-                KafkaFuture<Void> memberFuture = entry.getValue();
-                assertTrue(memberFuture.isCompletedExceptionally());
-                try {
-                    memberFuture.get();
-                    fail("get() should throw exception");
-                } catch (ExecutionException | InterruptedException e0) {
-                    assertTrue(e0.getCause() instanceof UnknownServerException);
-                }
-            }
+            MemberToRemove memberOne = new MemberToRemove(instanceOne);
+            MemberToRemove memberTwo = new MemberToRemove(instanceTwo);
+
+            TestUtils.assertFutureError(unknownErrorResult.all(), UnknownServerException.class);
+            TestUtils.assertFutureError(unknownErrorResult.memberResult(memberOne), UnknownServerException.class);
+            TestUtils.assertFutureError(unknownErrorResult.memberResult(memberTwo), UnknownServerException.class);
+
+            MemberResponse responseOne = new MemberResponse()
+                                             .setGroupInstanceId(instanceOne)
+                                             .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
+
+            MemberResponse responseTwo = new MemberResponse()
+                                             .setGroupInstanceId(instanceTwo)
+                                             .setErrorCode(Errors.NONE.code());
 
             // Inject one member level error.
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
             env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
                                                                          .setErrorCode(Errors.NONE.code())
-                                                                         .setMembers(memberResponses)));
+                                                                         .setMembers(Arrays.asList(responseOne, responseTwo))));
 
-            final MembershipChangeResult memberLevelErrorResult = env.adminClient().removeMemberFromConsumerGroup(
+            final RemoveMembersFromConsumerGroupResult memberLevelErrorResult = env.adminClient().removeMembersFromConsumerGroup(
                 groupId,
-                new RemoveMemberFromConsumerGroupOptions(membersToRemove)
+                new RemoveMembersFromConsumerGroupOptions(membersToRemove)
             );
 
-            result = memberLevelErrorResult.all();
-            assertTrue(result.hasError());
-            assertEquals(Errors.NONE, result.topLevelError());
-
-            memberFutures = result.memberFutures();
-            assertEquals(2, memberFutures.size());
-            for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
-                KafkaFuture<Void> memberFuture = entry.getValue();
-                if (entry.getKey().groupInstanceId().equals(instanceOne)) {
-                    try {
-                        memberFuture.get();
-                        fail("get() should throw ExecutionException");
-                    } catch (ExecutionException | InterruptedException e0) {
-                        assertTrue(e0.getCause() instanceof UnknownMemberIdException);
-                    }
-                } else {
-                    assertFalse(memberFuture.isCompletedExceptionally());
-                }
-            }
+            TestUtils.assertFutureError(memberLevelErrorResult.all(), UnknownMemberIdException.class);
+            TestUtils.assertFutureError(memberLevelErrorResult.memberResult(memberOne), UnknownMemberIdException.class);
+            assertNull(memberLevelErrorResult.memberResult(memberTwo).get());
 
-            // Return success.
+            // Return with missing member.
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
             env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
                                                                          .setErrorCode(Errors.NONE.code())
                                                                          .setMembers(Collections.singletonList(responseTwo))));
 
-            final MembershipChangeResult noErrorResult = env.adminClient().removeMemberFromConsumerGroup(
+            final RemoveMembersFromConsumerGroupResult missingMemberResult = env.adminClient().removeMembersFromConsumerGroup(
                 groupId,
-                new RemoveMemberFromConsumerGroupOptions(membersToRemove)
+                new RemoveMembersFromConsumerGroupOptions(membersToRemove)
             );
-            result = noErrorResult.all();
-            assertFalse(result.hasError());
-            assertEquals(Errors.NONE, result.topLevelError());
-
-            memberFutures = result.memberFutures();
-            assertEquals(1, memberFutures.size());
-            for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
-                assertFalse(entry.getValue().isCompletedExceptionally());
-            }
+
+            TestUtils.assertFutureError(missingMemberResult.all(), IllegalArgumentException.class);
+            // The memberOne was not included in the response.
+            TestUtils.assertFutureError(missingMemberResult.memberResult(memberOne), IllegalArgumentException.class);
+            assertNull(missingMemberResult.memberResult(memberTwo).get());
+
+
+            // Return with success.
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(new LeaveGroupResponse(
+                    new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(
+                        Arrays.asList(responseTwo,
+                                      new MemberResponse().setGroupInstanceId(instanceOne).setErrorCode(Errors.NONE.code())
+                        ))
+            ));
+
+            final RemoveMembersFromConsumerGroupResult noErrorResult = env.adminClient().removeMembersFromConsumerGroup(
+                groupId,
+                new RemoveMembersFromConsumerGroupOptions(membersToRemove)
+            );
+            assertNull(noErrorResult.all().get());
+            assertNull(noErrorResult.memberResult(memberOne).get());
+            assertNull(noErrorResult.memberResult(memberTwo).get());
         }
     }
 
@@ -2208,6 +2197,21 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testGetSubLevelError() {
+        List<MemberIdentity> memberIdentities = Arrays.asList(
+            new MemberIdentity().setGroupInstanceId("instance-0"),
+            new MemberIdentity().setGroupInstanceId("instance-1"));
+        Map<MemberIdentity, Errors> errorsMap = new HashMap<>();
+        errorsMap.put(memberIdentities.get(0), Errors.NONE);
+        errorsMap.put(memberIdentities.get(1), Errors.FENCED_INSTANCE_ID);
+        assertEquals(IllegalArgumentException.class, KafkaAdminClient.getSubLevelError(errorsMap,
+                                                                                       new MemberIdentity().setGroupInstanceId("non-exist-id"), "For unit test").getClass());
+        assertNull(KafkaAdminClient.getSubLevelError(errorsMap, memberIdentities.get(0), "For unit test"));
+        assertEquals(FencedInstanceIdException.class, KafkaAdminClient.getSubLevelError(
+            errorsMap, memberIdentities.get(1), "For unit test").getClass());
+    }
+
     private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
                                                                  MemberAssignment assignment) {
         return new MemberDescription(member.memberId(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java
deleted file mode 100644
index 5a13572..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.message.LeaveGroupResponseData;
-import org.apache.kafka.common.requests.LeaveGroupResponse;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class MembershipChangeResultTest {
-
-    @Test
-    public void testConstructor() {
-        KafkaFutureImpl<RemoveMemberFromGroupResult> removeMemberFuture = new KafkaFutureImpl<>();
-
-        MembershipChangeResult changeResult = new MembershipChangeResult(removeMemberFuture);
-        assertEquals(removeMemberFuture, changeResult.future());
-        RemoveMemberFromGroupResult removeMemberFromGroupResult = new RemoveMemberFromGroupResult(
-            new LeaveGroupResponse(new LeaveGroupResponseData()),
-            Collections.emptyList()
-        );
-
-        removeMemberFuture.complete(removeMemberFromGroupResult);
-        try {
-            assertEquals(removeMemberFromGroupResult, changeResult.all());
-        } catch (ExecutionException | InterruptedException e) {
-            fail("Unexpected exception " + e + " when trying to get remove member result");
-        }
-    }
-}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 017e447..240071a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -363,7 +363,7 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
-    public MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options) {
+    public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java
deleted file mode 100644
index f4a6ebe..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
-import org.apache.kafka.common.message.LeaveGroupResponseData;
-import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.LeaveGroupResponse;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class RemoveMemberFromGroupResultTest {
-
-    private String instanceOne = "instance-1";
-    private String instanceTwo = "instance-2";
-    private List<MemberIdentity> membersToRemove;
-
-    private List<MemberResponse> memberResponses;
-
-    @Before
-    public void setUp() {
-        membersToRemove = Arrays.asList(
-            new MemberIdentity()
-                .setGroupInstanceId(instanceOne),
-            new MemberIdentity()
-                .setGroupInstanceId(instanceTwo)
-        );
-
-        memberResponses = Arrays.asList(
-            new MemberResponse()
-                .setGroupInstanceId(instanceOne),
-            new MemberResponse()
-                .setGroupInstanceId(instanceTwo)
-        );
-    }
-
-    @Test
-    public void testTopLevelErrorConstructor() {
-        RemoveMemberFromGroupResult topLevelErrorResult =
-            new RemoveMemberFromGroupResult(new LeaveGroupResponse(
-                new LeaveGroupResponseData()
-                    .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
-                    .setMembers(memberResponses)), membersToRemove);
-
-        assertTrue(topLevelErrorResult.hasError());
-        assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, topLevelErrorResult.topLevelError());
-
-        Map<MemberIdentity, KafkaFuture<Void>> memberFutures = topLevelErrorResult.memberFutures();
-        assertEquals(2, memberFutures.size());
-        for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
-            KafkaFuture<Void> memberFuture = entry.getValue();
-            assertTrue(memberFuture.isCompletedExceptionally());
-            try {
-                memberFuture.get();
-                fail("get() should throw ExecutionException");
-            } catch (ExecutionException | InterruptedException e0) {
-                assertTrue(e0.getCause() instanceof GroupAuthorizationException);
-            }
-        }
-    }
-
-    @Test
-    public void testMemberLevelErrorConstructor() {
-        MemberResponse responseOne = new MemberResponse()
-                                         .setGroupInstanceId(instanceOne)
-                                         .setErrorCode(Errors.FENCED_INSTANCE_ID.code());
-        MemberResponse responseTwo = new MemberResponse()
-                                         .setGroupInstanceId(instanceTwo)
-                                         .setErrorCode(Errors.NONE.code());
-
-        RemoveMemberFromGroupResult memberLevelErrorResult = new RemoveMemberFromGroupResult(
-            new LeaveGroupResponse(new LeaveGroupResponseData()
-                                       .setMembers(Arrays.asList(responseOne, responseTwo))),
-            membersToRemove);
-        assertTrue(memberLevelErrorResult.hasError());
-        assertEquals(Errors.NONE, memberLevelErrorResult.topLevelError());
-
-        Map<MemberIdentity, KafkaFuture<Void>> memberFutures = memberLevelErrorResult.memberFutures();
-        assertEquals(2, memberFutures.size());
-        for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
-            KafkaFuture<Void> memberFuture = entry.getValue();
-            if (entry.getKey().groupInstanceId().equals(instanceOne)) {
-                assertTrue(memberFuture.isCompletedExceptionally());
-                try {
-                    memberFuture.get();
-                    fail("get() should throw ExecutionException");
-                } catch (ExecutionException | InterruptedException e0) {
-                    assertTrue(e0.getCause() instanceof FencedInstanceIdException);
-                }
-            } else {
-                assertFalse(memberFuture.isCompletedExceptionally());
-                try {
-                    memberFuture.get();
-                } catch (ExecutionException | InterruptedException e0) {
-                    fail("get() shouldn't throw exception");
-                }
-            }
-        }
-    }
-
-    @Test
-    public void testNoErrorConstructor() {
-        MemberResponse responseOne = new MemberResponse()
-                                         .setGroupInstanceId(instanceOne)
-                                         .setErrorCode(Errors.NONE.code());
-        MemberResponse responseTwo = new MemberResponse()
-                                         .setGroupInstanceId(instanceTwo)
-                                         .setErrorCode(Errors.NONE.code());
-        // If no error is specified, failed members are not visible.
-        RemoveMemberFromGroupResult noErrorResult = new RemoveMemberFromGroupResult(
-            new LeaveGroupResponse(new LeaveGroupResponseData()
-                                       .setMembers(Arrays.asList(responseOne, responseTwo))),
-            membersToRemove);
-        assertFalse(noErrorResult.hasError());
-        assertEquals(Errors.NONE, noErrorResult.topLevelError());
-        Map<MemberIdentity, KafkaFuture<Void>> memberFutures = noErrorResult.memberFutures();
-        assertEquals(2, memberFutures.size());
-        for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
-            try {
-                entry.getValue().get();
-            } catch (ExecutionException | InterruptedException e0) {
-                fail("get() shouldn't throw exception");
-            }
-        }
-    }
-}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptionsTest.java
similarity index 66%
rename from clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java
rename to clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptionsTest.java
index b36461e..41aa386 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptionsTest.java
@@ -16,23 +16,20 @@
  */
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.junit.Test;
 
 import java.util.Collections;
-import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
-public class RemoveMemberFromConsumerGroupOptionsTest {
+public class RemoveMembersFromConsumerGroupOptionsTest {
 
     @Test
     public void testConstructor() {
-        List<String> groupInstanceIds = Collections.singletonList("instance-1");
+        RemoveMembersFromConsumerGroupOptions options = new RemoveMembersFromConsumerGroupOptions(
+            Collections.singleton(new MemberToRemove("instance-1")));
 
-        RemoveMemberFromConsumerGroupOptions options = new RemoveMemberFromConsumerGroupOptions(groupInstanceIds);
-
-        assertEquals(Collections.singletonList(
-            new MemberIdentity().setGroupInstanceId("instance-1")), options.getMembers());
+        assertEquals(Collections.singleton(
+            new MemberToRemove("instance-1")), options.members());
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java
new file mode 100644
index 0000000..e2da23b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class RemoveMembersFromConsumerGroupResultTest {
+
+    private final MemberToRemove instanceOne = new MemberToRemove("instance-1");
+    private final MemberToRemove instanceTwo = new MemberToRemove("instance-2");
+    private Set<MemberToRemove> membersToRemove;
+    private Map<MemberIdentity, Errors> errorsMap;
+
+    private KafkaFutureImpl<Map<MemberIdentity, Errors>> memberFutures;
+
+    @Before
+    public void setUp() {
+        memberFutures = new KafkaFutureImpl<>();
+        membersToRemove = new HashSet<>();
+        membersToRemove.add(instanceOne);
+        membersToRemove.add(instanceTwo);
+
+        errorsMap = new HashMap<>();
+        errorsMap.put(instanceOne.toMemberIdentity(), Errors.NONE);
+        errorsMap.put(instanceTwo.toMemberIdentity(), Errors.FENCED_INSTANCE_ID);
+    }
+
+    @Test
+    public void testTopLevelErrorConstructor() throws InterruptedException {
+        memberFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception());
+        RemoveMembersFromConsumerGroupResult topLevelErrorResult =
+            new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
+        TestUtils.assertFutureError(topLevelErrorResult.all(), GroupAuthorizationException.class);
+    }
+
+    @Test
+    public void testMemberLevelErrorConstructor() throws InterruptedException, ExecutionException {
+        createAndVerifyMemberLevelError();
+    }
+
+    @Test
+    public void testMemberMissingErrorInRequestConstructor() throws InterruptedException, ExecutionException {
+        errorsMap.remove(instanceTwo.toMemberIdentity());
+        memberFutures.complete(errorsMap);
+        assertFalse(memberFutures.isCompletedExceptionally());
+        RemoveMembersFromConsumerGroupResult missingMemberResult =
+            new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
+
+        TestUtils.assertFutureError(missingMemberResult.all(), IllegalArgumentException.class);
+        assertNull(missingMemberResult.memberResult(instanceOne).get());
+        TestUtils.assertFutureError(missingMemberResult.memberResult(instanceTwo), IllegalArgumentException.class);
+    }
+
+    @Test
+    public void testMemberLevelErrorInResponseConstructor() throws InterruptedException, ExecutionException {
+        RemoveMembersFromConsumerGroupResult memberLevelErrorResult = createAndVerifyMemberLevelError();
+        assertThrows(IllegalArgumentException.class, () -> memberLevelErrorResult.memberResult(
+            new MemberToRemove("invalid-instance-id"))
+        );
+    }
+
+    @Test
+    public void testNoErrorConstructor() throws ExecutionException, InterruptedException {
+        Map<MemberIdentity, Errors> errorsMap = new HashMap<>();
+        errorsMap.put(instanceOne.toMemberIdentity(), Errors.NONE);
+        errorsMap.put(instanceTwo.toMemberIdentity(), Errors.NONE);
+        RemoveMembersFromConsumerGroupResult noErrorResult =
+            new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
+        memberFutures.complete(errorsMap);
+
+        assertNull(noErrorResult.all().get());
+        assertNull(noErrorResult.memberResult(instanceOne).get());
+        assertNull(noErrorResult.memberResult(instanceTwo).get());
+    }
+
+    private RemoveMembersFromConsumerGroupResult createAndVerifyMemberLevelError() throws InterruptedException, ExecutionException {
+        memberFutures.complete(errorsMap);
+        assertFalse(memberFutures.isCompletedExceptionally());
+        RemoveMembersFromConsumerGroupResult memberLevelErrorResult =
+            new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
+
+        TestUtils.assertFutureError(memberLevelErrorResult.all(), FencedInstanceIdException.class);
+        assertNull(memberLevelErrorResult.memberResult(instanceOne).get());
+        TestUtils.assertFutureError(memberLevelErrorResult.memberResult(instanceTwo), FencedInstanceIdException.class);
+        return memberLevelErrorResult;
+    }
+}
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2d17659..eeb93d6 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -401,8 +401,8 @@ object ConsumerGroupCommand extends Logging {
       result
     }
 
-    def deleteOffsets(groupId: String, topics: List[String]): Map[TopicPartition, Throwable] = {
-      var result: Map[TopicPartition, Throwable] = mutable.HashMap()
+    def deleteOffsets(groupId: String, topics: List[String]): (Errors, Map[TopicPartition, Throwable]) = {
+      var partitionLevelResult: Map[TopicPartition, Throwable] = mutable.HashMap()
 
       val (topicWithPartitions, topicWithoutPartitions) = topics.partition(_.contains(":"))
 
@@ -424,7 +424,7 @@ object ConsumerGroupCommand extends Logging {
             new TopicPartition(topic, partition.partition())
           }
           case Failure(e) =>
-            result += new TopicPartition(topic, -1) -> e
+            partitionLevelResult += new TopicPartition(topic, -1) -> e
             List.empty
         }
       }
@@ -437,46 +437,54 @@ object ConsumerGroupCommand extends Logging {
         withTimeoutMs(new DeleteConsumerGroupOffsetsOptions)
       )
 
-      deleteResult.all().get()
+      var topLevelException = Errors.NONE
+      Try(deleteResult.all.get) match {
+        case Success(_) =>
+        case Failure(e) => topLevelException = Errors.forException(e.getCause)
+      }
 
       partitions.foreach { partition =>
         Try(deleteResult.partitionResult(partition).get()) match {
-          case Success(_) => result += partition -> null
-          case Failure(e) => result += partition -> e
+          case Success(_) => partitionLevelResult += partition -> null
+          case Failure(e) => partitionLevelResult += partition -> e
         }
       }
 
-      result
+      (topLevelException, partitionLevelResult)
     }
 
     def deleteOffsets(): Unit = {
       val groupId = opts.options.valueOf(opts.groupOpt)
       val topics = opts.options.valuesOf(opts.topicOpt).asScala.toList
 
-      try {
-        val result = deleteOffsets(groupId, topics)
+      val (topLevelResult, partitionLevelResult) = deleteOffsets(groupId, topics)
+
+      topLevelResult match {
+        case Errors.NONE =>
+          println(s"Request succeed for deleting offsets with topic ${topics.mkString(", ")} group $groupId")
+        case Errors.INVALID_GROUP_ID =>
+          printError(s"'$groupId' is not valid.")
+        case Errors.GROUP_ID_NOT_FOUND =>
+          printError(s"'$groupId' does not exist.")
+        case Errors.GROUP_AUTHORIZATION_FAILED =>
+          printError(s"Access to '$groupId' is not authorized.")
+        case Errors.NON_EMPTY_GROUP =>
+          printError(s"Deleting offsets of a consumer group '$groupId' is forbidden if the group is not empty.")
+        case Errors.GROUP_SUBSCRIBED_TO_TOPIC |
+             Errors.TOPIC_AUTHORIZATION_FAILED |
+             Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+          printError(s"Encounter some partition level error, see the follow-up details:")
+        case _ =>
+          printError(s"Encounter some unknown error: $topLevelResult")
+      }
 
-        println("\n%-30s %-15s %-15s".format("TOPIC", "PARTITION", "STATUS"))
-        result.toList.sortBy(t => t._1.topic + t._1.partition.toString).foreach { case (tp, error) =>
-          println("%-30s %-15s %-15s".format(
-            tp.topic,
-            if (tp.partition >= 0) tp.partition else "Not Provided",
-            if (error != null) s"Error: ${error.getMessage}" else "Successful"
-          ))
-        }
-      } catch {
-        case e: ExecutionException =>
-          Errors.forException(e.getCause) match {
-            case Errors.INVALID_GROUP_ID =>
-              printError(s"'$groupId' is not valid.")
-            case Errors.GROUP_ID_NOT_FOUND =>
-              printError(s"'$groupId' does not exist.")
-            case Errors.GROUP_AUTHORIZATION_FAILED =>
-              printError(s"Access to '$groupId' is not authorized.")
-            case Errors.NON_EMPTY_GROUP =>
-              printError(s"Deleting offsets of a non consumer group '$groupId' is forbidden if the group is not empty.")
-            case _ => throw e
-          }
+      println("\n%-30s %-15s %-15s".format("TOPIC", "PARTITION", "STATUS"))
+      partitionLevelResult.toList.sortBy(t => t._1.topic + t._1.partition.toString).foreach { case (tp, error) =>
+        println("%-30s %-15s %-15s".format(
+          tp.topic,
+          if (tp.partition >= 0) tp.partition else "Not Provided",
+          if (error != null) s"Error: ${error.getMessage}" else "Successful"
+        ))
       }
     }
 
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index bed6900..87cc42f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -33,7 +33,6 @@ import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, Logging, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.ConsumerRecords
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -41,7 +40,6 @@ import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicPartition
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
 import org.apache.kafka.common.errors._
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
 import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -1235,7 +1233,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
           // Test that we can list the new group.
           TestUtils.waitUntilTrue(() => {
             val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId)
-            !matching.isEmpty
+            matching.nonEmpty
           }, s"Expected to be able to list $testGroupId")
 
           val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
@@ -1247,7 +1245,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
           var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
 
           assertEquals(testGroupId, testGroupDescription.groupId())
-          assertFalse(testGroupDescription.isSimpleConsumerGroup())
+          assertFalse(testGroupDescription.isSimpleConsumerGroup)
           assertEquals(1, testGroupDescription.members().size())
           val member = testGroupDescription.members().iterator().next()
           assertEquals(testClientId, member.clientId())
@@ -1281,25 +1279,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
           
           // Test delete non-exist consumer instance
           val invalidInstanceId = "invalid-instance-id"
-          var removeMemberResult = client.removeMemberFromConsumerGroup(testGroupId, new RemoveMemberFromConsumerGroupOptions(
-            Collections.singletonList(invalidInstanceId)
-          )).all()
+          var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions(
+            Collections.singleton(new MemberToRemove(invalidInstanceId))
+          ))
 
-          assertTrue(removeMemberResult.hasError)
-          assertEquals(Errors.NONE, removeMemberResult.topLevelError)
-
-          val firstMemberFutures = removeMemberResult.memberFutures()
-          assertEquals(1, firstMemberFutures.size)
-          firstMemberFutures.values.asScala foreach { case value =>
-            try {
-              value.get()
-            } catch {
-              case e: ExecutionException =>
-                assertTrue(e.getCause.isInstanceOf[UnknownMemberIdException])
-              case t: Throwable =>
-                fail(s"Should have caught exception in getting member future: $t")
-            }
-          }
+          TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all, classOf[UnknownMemberIdException])
+          val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId))
+          TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture, classOf[UnknownMemberIdException])
 
           // Test consumer group deletion
           var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
@@ -1316,25 +1302,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
             classOf[GroupNotEmptyException])
 
           // Test delete correct member
-          removeMemberResult = client.removeMemberFromConsumerGroup(testGroupId, new RemoveMemberFromConsumerGroupOptions(
-            Collections.singletonList(testInstanceId)
-          )).all()
+          removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions(
+            Collections.singleton(new MemberToRemove(testInstanceId))
+          ))
 
-          assertFalse(removeMemberResult.hasError)
-          assertEquals(Errors.NONE, removeMemberResult.topLevelError)
-
-          val deletedMemberFutures = removeMemberResult.memberFutures()
-          assertEquals(1, firstMemberFutures.size)
-          deletedMemberFutures.values.asScala foreach { case value =>
-            try {
-              value.get()
-            } catch {
-              case e: ExecutionException =>
-                assertTrue(e.getCause.isInstanceOf[UnknownMemberIdException])
-              case t: Throwable =>
-                fail(s"Should have caught exception in getting member future: $t")
-            }
-          }
+          assertNull(removeMembersResult.all().get())
+          val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId))
+          assertNull(validMemberFuture.get())
 
           // The group should contain no member now.
           val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava,
@@ -1404,7 +1378,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
         // Test offset deletion while consuming
         val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, Set(tp1, tp2).asJava)
 
-        assertNull(offsetDeleteResult.all().get())
+        // Top level error will equal to the first partition level error
+        assertFutureExceptionTypeEquals(offsetDeleteResult.all(), classOf[GroupSubscribedToTopicException])
         assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp1),
           classOf[GroupSubscribedToTopicException])
         assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
@@ -1426,11 +1401,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       // Test offset deletion when group is empty
       val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, Set(tp1, tp2).asJava)
 
-      assertNull(offsetDeleteResult.all().get())
+      assertFutureExceptionTypeEquals(offsetDeleteResult.all(),
+        classOf[UnknownTopicOrPartitionException])
       assertNull(offsetDeleteResult.partitionResult(tp1).get())
       assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
         classOf[UnknownTopicOrPartitionException])
-
     } finally {
       Utils.closeQuietly(client, "adminClient")
     }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 38fe165..464e1ff 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1399,7 +1399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new AccessControlEntry(userPrincipalStr, WildcardHost, DELETE, ALLOW)), groupResource)
     addAndVerifyAcls(Set(new AccessControlEntry(userPrincipalStr, WildcardHost, READ, ALLOW)), groupResource)
     val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava)
-    assertNull(result.all().get())
+    TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[TopicAuthorizationException])
     TestUtils.assertFutureExceptionTypeEquals(result.partitionResult(tp), classOf[TopicAuthorizationException])
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteOffsetsConsumerGroupCommandIntegrationTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteOffsetsConsumerGroupCommandIntegrationTest.scala
index 608e436..0e28852 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteOffsetsConsumerGroupCommandIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteOffsetsConsumerGroupCommandIntegrationTest.scala
@@ -18,7 +18,6 @@
 package kafka.admin
 
 import java.util.Properties
-import java.util.concurrent.ExecutionException
 
 import kafka.server.Defaults
 import kafka.utils.TestUtils
@@ -51,14 +50,9 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm
     val group = "missing.group"
     val topic = "foo:1"
     val service = getConsumerGroupService(getArgs(group, topic))
-    try {
-      service.deleteOffsets(group, List(topic))
-      fail("GroupIdNotFoundException should have been raised")
-    } catch {
-      case e: ExecutionException =>
-        if (e.getCause != Errors.GROUP_ID_NOT_FOUND.exception())
-          throw e
-    }
+
+    val (error, _) = service.deleteOffsets(group, List(topic))
+    assertEquals(Errors.GROUP_ID_NOT_FOUND, error)
   }
 
   @Test
@@ -134,8 +128,12 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm
     withConsumerGroup {
       val topic = if (inputPartition >= 0) inputTopic + ":" + inputPartition else inputTopic
       val service = getConsumerGroupService(getArgs(group, topic))
-      val partitions = service.deleteOffsets(group, List(topic))
+      val (topLevelError, partitions) = service.deleteOffsets(group, List(topic))
       val tp = new TopicPartition(inputTopic, expectedPartition)
+      // Partition level error should propagate to top level, unless this is due to a missed partition attempt.
+      if (inputPartition >= 0) {
+        assertEquals(expectedError, topLevelError)
+      }
       if (expectedError == Errors.NONE)
         assertNull(partitions(tp))
       else


Mime
View raw message