kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8221; Add batch leave group request (#6714)
Date Sat, 27 Jul 2019 06:14:05 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 74c90f4  KAFKA-8221; Add batch leave group request (#6714)
74c90f4 is described below

commit 74c90f46c34727be9484e9826ff543b451ada775
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Fri Jul 26 23:13:37 2019 -0700

    KAFKA-8221; Add batch leave group request (#6714)
    
    This patch is part of KIP-345. We are aiming to support batch leave group request issued from admin client. This diff is the first effort to bump leave group request version.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../consumer/internals/AbstractCoordinator.java    |  26 +-
 .../kafka/common/requests/LeaveGroupRequest.java   |  48 +++-
 .../kafka/common/requests/LeaveGroupResponse.java  |  86 +++++-
 .../common/message/LeaveGroupRequest.json          |  15 +-
 .../common/message/LeaveGroupResponse.json         |  15 +-
 .../internals/AbstractCoordinatorTest.java         | 280 ++++++++++++--------
 .../internals/ConsumerCoordinatorTest.java         | 294 +++++++++------------
 .../apache/kafka/common/message/MessageTest.java   |  44 ++-
 .../common/requests/LeaveGroupRequestTest.java     |  88 +++++-
 .../common/requests/LeaveGroupResponseTest.java    |  86 +++++-
 .../kafka/common/requests/RequestResponseTest.java |  37 +--
 .../kafka/coordinator/group/GroupCoordinator.scala | 105 +++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  53 ++--
 .../kafka/api/AuthorizerIntegrationTest.scala      |   9 +-
 .../group/GroupCoordinatorConcurrencyTest.scala    |  24 +-
 .../coordinator/group/GroupCoordinatorTest.scala   | 238 ++++++++++++++---
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  58 ++++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   9 +-
 .../streams/processor/internals/AssignedTasks.java |   1 -
 20 files changed, 1065 insertions(+), 453 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 03321f1..908e114 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -62,7 +62,7 @@
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/>
+              files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
               files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b92a4a6..fc385f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -35,7 +35,8 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
-import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -843,7 +844,8 @@ public abstract class AbstractCoordinator implements Closeable {
      * Leave the current group and reset local generation/memberId.
      * @param leaveReason reason to attempt leaving the group
      */
-    public synchronized void maybeLeaveGroup(String leaveReason) {
+    public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
+        RequestFuture<Void> future = null;
         // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
         // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
         // and the membership expiration is only controlled by session timeout.
@@ -853,14 +855,18 @@ public abstract class AbstractCoordinator implements Closeable {
             // attempt any resending if the request fails or times out.
             log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
                      generation.memberId, coordinator, leaveReason);
-            LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
-                    .setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId));
-            client.send(coordinator, request)
+            LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
+                rebalanceConfig.groupId,
+                Collections.singletonList(new MemberIdentity()
+                                              .setMemberId(generation.memberId))
+            );
+            future = client.send(coordinator, request)
                     .compose(new LeaveGroupResponseHandler());
             client.pollNoWakeup();
         }
 
         resetGeneration();
+        return future;
     }
 
     protected boolean isDynamicMember() {
@@ -870,12 +876,18 @@ public abstract class AbstractCoordinator implements Closeable {
     private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
         @Override
         public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
-            Errors error = leaveResponse.error();
+            final List<MemberResponse> members = leaveResponse.memberResponses();
+            if (members.size() > 1) {
+                future.raise(new IllegalStateException("The expected leave group response " +
+                                                           "should only contain no more than one member info, however get " + members));
+            }
+
+            final Errors error = leaveResponse.error();
             if (error == Errors.NONE) {
                 log.debug("LeaveGroup request returned successfully");
                 future.complete(null);
             } else {
-                log.debug("LeaveGroup request failed with error: {}", error.message());
+                log.error("LeaveGroup request failed with error: {}", error.message());
                 future.raise(error);
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index e6e239c..ac77379 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -16,33 +16,64 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 
 public class LeaveGroupRequest extends AbstractRequest {
 
     public static class Builder extends AbstractRequest.Builder<LeaveGroupRequest> {
+        private final String groupId;
+        private final List<MemberIdentity> members;
 
-        private final LeaveGroupRequestData data;
-
-        public Builder(LeaveGroupRequestData data) {
+        public Builder(String groupId, List<MemberIdentity> members) {
             super(ApiKeys.LEAVE_GROUP);
-            this.data = data;
+            this.groupId = groupId;
+            this.members = members;
+            if (members.isEmpty()) {
+                throw new IllegalArgumentException("leaving members should not be empty");
+            }
         }
 
+        /**
+         * Based on the request version to choose fields.
+         */
         @Override
         public LeaveGroupRequest build(short version) {
+            final LeaveGroupRequestData data;
+            // Starting from version 3, all the leave group request will be in batch.
+            if (version >= 3) {
+                data = new LeaveGroupRequestData()
+                           .setGroupId(groupId)
+                           .setMembers(members);
+            } else {
+                if (members.size() != 1) {
+                    throw new UnsupportedVersionException("Version " + version + " leave group request only " +
+                                                              "supports single member instance than " + members.size() + " members");
+                }
+
+                data = new LeaveGroupRequestData()
+                           .setGroupId(groupId)
+                           .setMemberId(members.get(0).memberId());
+            }
             return new LeaveGroupRequest(data, version);
         }
 
         @Override
         public String toString() {
-            return data.toString();
+            return "(type=LeaveGroupRequest" +
+                       ", groupId=" + groupId +
+                       ", members=" + MessageUtil.deepToString(members.iterator()) +
+                       ")";
         }
     }
     private final LeaveGroupRequestData data;
@@ -64,6 +95,13 @@ public class LeaveGroupRequest extends AbstractRequest {
         return data;
     }
 
+    public List<MemberIdentity> members() {
+        // Before version 3, leave group request is still in single mode
+        return version <= 2 ? Collections.singletonList(
+            new MemberIdentity()
+                .setMemberId(data.memberId())) : data.members();
+    }
+
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         LeaveGroupResponseData responseData = new LeaveGroupResponseData()
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 6390c0f..97a583f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -17,47 +17,117 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+/**
+ * Possible error codes.
+ *
+ * Top level errors:
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ *
+ * Member level errors:
+ * - {@link Errors#FENCED_INSTANCE_ID}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
+ *
+ * If the top level error code is set, normally this indicates that broker early stops the request
+ * handling due to some severe global error, so it is expected to see the member level errors to be empty.
+ * For older version response, we may populate member level error towards top level because older client
+ * couldn't parse member level.
+ */
 public class LeaveGroupResponse extends AbstractResponse {
 
-    private final LeaveGroupResponseData data;
+    public final LeaveGroupResponseData data;
 
     public LeaveGroupResponse(LeaveGroupResponseData data) {
         this.data = data;
     }
 
+    public LeaveGroupResponse(List<MemberResponse> memberResponses,
+                              Errors topLevelError,
+                              final int throttleTimeMs,
+                              final short version) {
+        if (version <= 2) {
+            // Populate member level error.
+            final short errorCode = getError(topLevelError, memberResponses).code();
+
+            this.data = new LeaveGroupResponseData()
+                            .setErrorCode(errorCode);
+        } else {
+            this.data = new LeaveGroupResponseData()
+                            .setErrorCode(topLevelError.code())
+                            .setMembers(memberResponses);
+        }
+
+        if (version >= 1) {
+            this.data.setThrottleTimeMs(throttleTimeMs);
+        }
+    }
+
     public LeaveGroupResponse(Struct struct) {
         short latestVersion = (short) (LeaveGroupResponseData.SCHEMAS.length - 1);
         this.data = new LeaveGroupResponseData(struct, latestVersion);
     }
+
     public LeaveGroupResponse(Struct struct, short version) {
         this.data = new LeaveGroupResponseData(struct, version);
     }
 
-    public LeaveGroupResponseData data() {
-        return data;
-    }
-
     @Override
     public int throttleTimeMs() {
         return data.throttleTimeMs();
     }
 
+    public List<MemberResponse> memberResponses() {
+        return data.members();
+    }
+
     public Errors error() {
-        return Errors.forCode(data.errorCode());
+        return getError(Errors.forCode(data.errorCode()), data.members());
+    }
+
+    private static Errors getError(Errors topLevelError, List<MemberResponse> memberResponses) {
+        if (topLevelError != Errors.NONE) {
+            return topLevelError;
+        } else {
+            for (MemberResponse memberResponse : memberResponses) {
+                Errors memberError = Errors.forCode(memberResponse.errorCode());
+                if (memberError != Errors.NONE) {
+                    return memberError;
+                }
+            }
+            return Errors.NONE;
+        }
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+        Map<Errors, Integer> combinedErrorCounts = new HashMap<>();
+        // Top level error.
+        Errors topLevelError = Errors.forCode(data.errorCode());
+        if (topLevelError != Errors.NONE) {
+            updateErrorCounts(combinedErrorCounts, topLevelError);
+        }
+
+        // Member level error.
+        for (MemberResponse memberResponse : data.members()) {
+            Errors memberError = Errors.forCode(memberResponse.errorCode());
+            if (memberError != Errors.NONE) {
+                updateErrorCounts(combinedErrorCounts, memberError);
+            }
+        }
+        return combinedErrorCounts;
     }
 
     @Override
diff --git a/clients/src/main/resources/common/message/LeaveGroupRequest.json b/clients/src/main/resources/common/message/LeaveGroupRequest.json
index 7c536da..9f98f06 100644
--- a/clients/src/main/resources/common/message/LeaveGroupRequest.json
+++ b/clients/src/main/resources/common/message/LeaveGroupRequest.json
@@ -18,11 +18,20 @@
   "type": "request",
   "name": "LeaveGroupRequest",
   // Version 1 and 2 are the same as version 0.
-  "validVersions": "0-2",
+  // Version 3 defines batch processing scheme with group.instance.id + member.id for identity
+  "validVersions": "0-3",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The ID of the group to leave." },
-    { "name": "MemberId", "type": "string", "versions": "0+",
-      "about": "The member ID to remove from the group." }
+    { "name": "MemberId", "type": "string", "versions": "0-2",
+      "about": "The member ID to remove from the group." },
+    { "name": "Members", "type": "[]MemberIdentity", "versions": "3+",
+      "about": "List of leaving member identities.", "fields": [
+      { "name": "MemberId", "type": "string", "versions": "3+",
+        "about": "The member ID to remove from the group." },
+      { "name": "GroupInstanceId", "type": "string",
+        "versions": "3+", "nullableVersions": "3+", "default": "null",
+        "about": "The group instance ID to remove from the group." }
+    ]}
   ]
 }
diff --git a/clients/src/main/resources/common/message/LeaveGroupResponse.json b/clients/src/main/resources/common/message/LeaveGroupResponse.json
index 0d887cd..2bbf63d 100644
--- a/clients/src/main/resources/common/message/LeaveGroupResponse.json
+++ b/clients/src/main/resources/common/message/LeaveGroupResponse.json
@@ -19,11 +19,22 @@
   "name": "LeaveGroupResponse",
   // Version 1 adds the throttle time.
   // Starting in version 2, on quota violation, brokers send out responses before throttling.
-  "validVersions": "0-2",
+  // Starting in version 3, we will make leave group request into batch mode.
+  "validVersions": "0-3",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
-      "about": "The error code, or 0 if there was no error." }
+      "about": "The error code, or 0 if there was no error." },
+
+    { "name": "Members", "type": "[]MemberResponse", "versions": "3+",
+      "about": "List of leaving member responses.", "fields": [
+      { "name": "MemberId", "type": "string", "versions": "3+",
+        "about": "The member ID to remove from the group." },
+      { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+        "about": "The group instance ID to remove from the group." },
+      { "name": "ErrorCode", "type": "int16", "versions": "3+",
+        "about": "The error code, or 0 if there was no error." }
+    ]}
   ]
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 659ef5f..e0264b3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -18,25 +18,32 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.LogContext;
@@ -48,6 +55,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -63,6 +71,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static java.util.Collections.emptyMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThrows;
@@ -75,7 +84,6 @@ public class AbstractCoordinatorTest {
     private static final int SESSION_TIMEOUT_MS = 10000;
     private static final int HEARTBEAT_INTERVAL_MS = 3000;
     private static final int RETRY_BACKOFF_MS = 100;
-    private static final int LONG_RETRY_BACKOFF_MS = 10000;
     private static final int REQUEST_TIMEOUT_MS = 40000;
     private static final String GROUP_ID = "dummy-group";
     private static final String METRIC_GROUP_PREFIX = "consumer";
@@ -87,6 +95,10 @@ public class AbstractCoordinatorTest {
     private ConsumerNetworkClient consumerClient;
     private DummyCoordinator coordinator;
 
+    private final String memberId = "memberId";
+    private final String leaderId = "leaderId";
+    private final int defaultGeneration = -1;
+
     private void setupCoordinator() {
         setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS,
             Optional.empty());
@@ -164,7 +176,7 @@ public class AbstractCoordinatorTest {
             assertFalse(firstAttempt.get());
             assertTrue(consumerClient.hasPendingRequests(coordinatorNode));
 
-            mockClient.respond(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+            mockClient.respond(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
             mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
 
             Timer secondAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS);
@@ -183,10 +195,7 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
-        final String memberId = "memberId";
-        final int generation = -1;
-
-        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED));
+        mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED));
 
         RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
         assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
@@ -232,23 +241,14 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
-        final String memberId = "memberId";
-        final int generation = -1;
-
-        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.MEMBER_ID_REQUIRED));
+        mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.MEMBER_ID_REQUIRED));
 
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                if (!(body instanceof JoinGroupRequest)) {
-                    return false;
-                }
-                JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
-                if (!joinGroupRequest.data().memberId().equals(memberId)) {
-                    return false;
-                }
-                return true;
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof JoinGroupRequest)) {
+                return false;
             }
+            JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
+            return joinGroupRequest.data().memberId().equals(memberId);
         }, joinGroupResponse(Errors.UNKNOWN_MEMBER_ID));
 
         RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
@@ -256,7 +256,7 @@ public class AbstractCoordinatorTest {
         assertEquals(Errors.MEMBER_ID_REQUIRED.message(), future.exception().getMessage());
         assertTrue(coordinator.rejoinNeededOrPending());
         assertTrue(coordinator.hasValidMemberId());
-        assertTrue(coordinator.hasMatchingGenerationId(generation));
+        assertTrue(coordinator.hasMatchingGenerationId(defaultGeneration));
         future = coordinator.sendJoinGroupRequest();
         assertTrue(consumerClient.poll(future, mockTime.timer(REBALANCE_TIMEOUT_MS)));
     }
@@ -267,10 +267,7 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
-        final String memberId = "memberId";
-        final int generation = -1;
-
-        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.FENCED_INSTANCE_ID));
+        mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.FENCED_INSTANCE_ID));
 
         RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
         assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
@@ -284,7 +281,6 @@ public class AbstractCoordinatorTest {
         setupCoordinator();
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 
-        final String memberId = "memberId";
         final int generation = -1;
 
         mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.NONE));
@@ -298,7 +294,6 @@ public class AbstractCoordinatorTest {
         setupCoordinator();
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 
-        final String memberId = "memberId";
         final int generation = -1;
 
         mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.NONE));
@@ -325,17 +320,14 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
-        final String memberId = "memberId";
-        final int generation = -1;
-
-        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID));
+        mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID));
 
         RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
 
         assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
         assertEquals(Errors.UNKNOWN_MEMBER_ID.message(), future.exception().getMessage());
         assertTrue(coordinator.rejoinNeededOrPending());
-        assertTrue(coordinator.hasMatchingGenerationId(generation));
+        assertTrue(coordinator.hasMatchingGenerationId(defaultGeneration));
     }
 
     @Test
@@ -348,19 +340,16 @@ public class AbstractCoordinatorTest {
         setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, groupInstanceId);
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
 
         final RuntimeException e = new RuntimeException();
 
         // raise the error when the coordinator tries to send leave group request.
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                if (body instanceof LeaveGroupRequest)
-                    throw e;
-                return false;
-            }
+        mockClient.prepareResponse(body -> {
+            if (body instanceof LeaveGroupRequest)
+                throw e;
+            return false;
         }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
 
         try {
@@ -379,23 +368,105 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testHandleNormalLeaveGroupResponse() {
+        MemberResponse memberResponse = new MemberResponse()
+                                            .setMemberId(memberId)
+                                            .setErrorCode(Errors.NONE.code());
+        LeaveGroupResponse response =
+            leaveGroupResponse(Collections.singletonList(memberResponse));
+        RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response);
+        assertNotNull(leaveGroupFuture);
+        assertTrue(leaveGroupFuture.succeeded());
+    }
+
+    @Test
+    public void testHandleMultipleMembersLeaveGroupResponse() {
+        MemberResponse memberResponse = new MemberResponse()
+                                            .setMemberId(memberId)
+                                            .setErrorCode(Errors.NONE.code());
+        LeaveGroupResponse response =
+            leaveGroupResponse(Arrays.asList(memberResponse, memberResponse));
+        RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response);
+        assertNotNull(leaveGroupFuture);
+        assertTrue(leaveGroupFuture.exception() instanceof IllegalStateException);
+    }
+
+    @Test
+    public void testHandleLeaveGroupResponseWithEmptyMemberResponse() {
+        LeaveGroupResponse response =
+            leaveGroupResponse(Collections.emptyList());
+        RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response);
+        assertNotNull(leaveGroupFuture);
+        assertTrue(leaveGroupFuture.succeeded());
+    }
+
+    @Test
+    public void testHandleLeaveGroupResponseWithException() {
+        MemberResponse memberResponse = new MemberResponse()
+                                            .setMemberId(memberId)
+                                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
+        LeaveGroupResponse response =
+            leaveGroupResponse(Collections.singletonList(memberResponse));
+        RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response);
+        assertNotNull(leaveGroupFuture);
+        assertTrue(leaveGroupFuture.exception() instanceof UnknownMemberIdException);
+    }
+
+    @Test
+    public void testHandleSingleLeaveGroupRequest() {
+        setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, Optional.empty());
+        mockClient.setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(
+            new ApiVersionsResponse.ApiVersion(ApiKeys.LEAVE_GROUP, (short) 2, (short) 2))));
+
+        LeaveGroupResponse expectedResponse = leaveGroupResponse(Collections.singletonList(
+            new MemberResponse()
+                .setErrorCode(Errors.NONE.code())
+                .setMemberId(memberId)));
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        mockClient.prepareResponse(body -> {
+            if (body instanceof LeaveGroupRequest) {
+                LeaveGroupRequest request = (LeaveGroupRequest) body;
+                return request.data().memberId().equals(memberId)
+                    && request.data().members().isEmpty();
+            } else {
+                return false;
+            }
+        }, expectedResponse);
+
+        coordinator.ensureActiveGroup();
+        RequestFuture<Void> leaveGroupFuture = coordinator.maybeLeaveGroup("test single leave group");
+        assertTrue(leaveGroupFuture.succeeded());
+    }
+
+    private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse leaveGroupResponse) {
+        setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, Optional.empty());
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        mockClient.prepareResponse(leaveGroupResponse);
+
+        coordinator.ensureActiveGroup();
+        return coordinator.maybeLeaveGroup("test maybe leave group");
+    }
+
+    @Test
     public void testUncaughtExceptionInHeartbeatThread() throws Exception {
         setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
 
         final RuntimeException e = new RuntimeException();
 
         // raise the error when the background thread tries to send a heartbeat
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                if (body instanceof HeartbeatRequest)
-                    throw e;
-                return false;
-            }
+        mockClient.prepareResponse(body -> {
+            if (body instanceof HeartbeatRequest)
+                throw e;
+            return false;
         }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
 
         try {
@@ -414,21 +485,19 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testPollHeartbeatAwakesHeartbeatThread() throws Exception {
-        setupCoordinator(LONG_RETRY_BACKOFF_MS);
+        final int longRetryBackoffMs = 10000;
+        setupCoordinator(longRetryBackoffMs);
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
 
         coordinator.ensureActiveGroup();
 
         final CountDownLatch heartbeatDone = new CountDownLatch(1);
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                heartbeatDone.countDown();
-                return body instanceof HeartbeatRequest;
-            }
+        mockClient.prepareResponse(body -> {
+            heartbeatDone.countDown();
+            return body instanceof HeartbeatRequest;
         }, heartbeatResponse(Errors.NONE));
 
         mockTime.sleep(HEARTBEAT_INTERVAL_MS);
@@ -473,7 +542,7 @@ public class AbstractCoordinatorTest {
                     throw new WakeupException();
                 return isJoinGroupRequest;
             }
-        }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        }, joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
@@ -511,7 +580,7 @@ public class AbstractCoordinatorTest {
                     throw new WakeupException();
                 return isJoinGroupRequest;
             }
-        }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        }, joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
@@ -540,16 +609,13 @@ public class AbstractCoordinatorTest {
         setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
-                if (isJoinGroupRequest)
-                    // wakeup after the request returns
-                    consumerClient.wakeup();
-                return isJoinGroupRequest;
-            }
-        }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(body -> {
+            boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
+            if (isJoinGroupRequest)
+                // wakeup after the request returns
+                consumerClient.wakeup();
+            return isJoinGroupRequest;
+        }, joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
@@ -576,16 +642,13 @@ public class AbstractCoordinatorTest {
         setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
-                if (isJoinGroupRequest)
-                    // wakeup after the request returns
-                    consumerClient.wakeup();
-                return isJoinGroupRequest;
-            }
-        }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(body -> {
+            boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
+            if (isJoinGroupRequest)
+                // wakeup after the request returns
+                consumerClient.wakeup();
+            return isJoinGroupRequest;
+        }, joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
@@ -614,7 +677,7 @@ public class AbstractCoordinatorTest {
         setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             private int invocations = 0;
             @Override
@@ -652,7 +715,7 @@ public class AbstractCoordinatorTest {
         setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             private int invocations = 0;
             @Override
@@ -692,16 +755,13 @@ public class AbstractCoordinatorTest {
         setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
-                if (isSyncGroupRequest)
-                    // wakeup after the request returns
-                    consumerClient.wakeup();
-                return isSyncGroupRequest;
-            }
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
+        mockClient.prepareResponse(body -> {
+            boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
+            if (isSyncGroupRequest)
+                // wakeup after the request returns
+                consumerClient.wakeup();
+            return isSyncGroupRequest;
         }, syncGroupResponse(Errors.NONE));
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
@@ -728,16 +788,13 @@ public class AbstractCoordinatorTest {
         setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
-                if (isSyncGroupRequest)
-                    // wakeup after the request returns
-                    consumerClient.wakeup();
-                return isSyncGroupRequest;
-            }
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
+        mockClient.prepareResponse(body -> {
+            boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
+            if (isSyncGroupRequest)
+                // wakeup after the request returns
+                consumerClient.wakeup();
+            return isSyncGroupRequest;
         }, syncGroupResponse(Errors.NONE));
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
@@ -765,7 +822,7 @@ public class AbstractCoordinatorTest {
 
         coordinator.wakeupOnJoinComplete = true;
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
@@ -806,14 +863,11 @@ public class AbstractCoordinatorTest {
 
     private AtomicBoolean prepareFirstHeartbeat() {
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                boolean isHeartbeatRequest = body instanceof HeartbeatRequest;
-                if (isHeartbeatRequest)
-                    heartbeatReceived.set(true);
-                return isHeartbeatRequest;
-            }
+        mockClient.prepareResponse(body -> {
+            boolean isHeartbeatRequest = body instanceof HeartbeatRequest;
+            if (isHeartbeatRequest)
+                heartbeatReceived.set(true);
+            return isHeartbeatRequest;
         }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
         return heartbeatReceived;
     }
@@ -861,6 +915,12 @@ public class AbstractCoordinatorTest {
         );
     }
 
+    private LeaveGroupResponse leaveGroupResponse(List<MemberResponse> members) {
+        return new LeaveGroupResponse(new LeaveGroupResponseData()
+                .setErrorCode(Errors.NONE.code())
+                .setMembers(members));
+    }
+
     public static class DummyCoordinator extends AbstractCoordinator {
 
         private int onJoinPrepareInvokes = 0;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a81e73e..11273cb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
@@ -51,7 +52,6 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -473,14 +473,11 @@ public class ConsumerCoordinatorTest {
         partitionAssignor.prepare(singletonMap(consumerId, assigned));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.data.memberId().equals(consumerId) &&
-                        sync.data.generationId() == 1 &&
-                        sync.groupAssignments().containsKey(consumerId);
-            }
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == 1 &&
+                    sync.groupAssignments().containsKey(consumerId);
         }, syncGroupResponse(assigned, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -514,28 +511,22 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(
                 joinGroupLeaderResponse(
                     1, consumerId, singletonMap(consumerId, oldSubscription), Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.data.memberId().equals(consumerId) &&
-                        sync.data.generationId() == 1 &&
-                        sync.groupAssignments().containsKey(consumerId);
-            }
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == 1 &&
+                    sync.groupAssignments().containsKey(consumerId);
         }, syncGroupResponse(oldAssignment, Errors.NONE));
 
         // Second correct assignment for subscription
         client.prepareResponse(
                 joinGroupLeaderResponse(
                     1, consumerId, singletonMap(consumerId, newSubscription), Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.data.memberId().equals(consumerId) &&
-                        sync.data.generationId() == 1 &&
-                        sync.groupAssignments().containsKey(consumerId);
-            }
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == 1 &&
+                    sync.groupAssignments().containsKey(consumerId);
         }, syncGroupResponse(newAssignment, Errors.NONE));
 
         // Poll once so that the join group future gets created and complete
@@ -587,14 +578,11 @@ public class ConsumerCoordinatorTest {
         partitionAssignor.prepare(singletonMap(consumerId, assigned));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.data.memberId().equals(consumerId) &&
-                        sync.data.generationId() == 1 &&
-                        sync.groupAssignments().containsKey(consumerId);
-            }
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == 1 &&
+                    sync.groupAssignments().containsKey(consumerId);
         }, syncGroupResponse(assigned, Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(metadataResponse);
@@ -634,15 +622,12 @@ public class ConsumerCoordinatorTest {
         final List<String> updatedSubscription = Arrays.asList(topic1, topic2);
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                final Map<String, Integer> updatedPartitions = new HashMap<>();
-                for (String topic : updatedSubscription)
-                    updatedPartitions.put(topic, 1);
-                client.updateMetadata(TestUtils.metadataUpdateWith(1, updatedPartitions));
-                return true;
-            }
+        client.prepareResponse(body -> {
+            final Map<String, Integer> updatedPartitions = new HashMap<>();
+            for (String topic : updatedSubscription)
+                updatedPartitions.put(topic, 1);
+            client.updateMetadata(TestUtils.metadataUpdateWith(1, updatedPartitions));
+            return true;
         }, syncGroupResponse(oldAssigned, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -661,20 +646,17 @@ public class ConsumerCoordinatorTest {
         partitionAssignor.prepare(singletonMap(consumerId, newAssigned));
 
         // we expect to see a second rebalance with the new-found topics
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                JoinGroupRequest join = (JoinGroupRequest) body;
-                Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator =
-                        join.data().protocols().iterator();
-                assertTrue(protocolIterator.hasNext());
-                JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
-
-                ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
-                ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
-                metadata.rewind();
-                return subscription.topics().containsAll(updatedSubscription);
-            }
+        client.prepareResponse(body -> {
+            JoinGroupRequest join = (JoinGroupRequest) body;
+            Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator =
+                    join.data().protocols().iterator();
+            assertTrue(protocolIterator.hasNext());
+            JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
+
+            ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
+            ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
+            metadata.rewind();
+            return subscription.topics().containsAll(updatedSubscription);
         }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(newAssigned, Errors.NONE));
 
@@ -707,14 +689,11 @@ public class ConsumerCoordinatorTest {
         client.prepareMetadataUpdate(metadataResponse);
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.data.memberId().equals(consumerId) &&
-                    sync.data.generationId() == 1 &&
-                    sync.groupAssignments().isEmpty();
-            }
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                sync.data.generationId() == 1 &&
+                sync.groupAssignments().isEmpty();
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
         partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
@@ -786,14 +765,11 @@ public class ConsumerCoordinatorTest {
 
         // normal join group
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.data.memberId().equals(consumerId) &&
-                        sync.data.generationId() == 1 &&
-                        sync.groupAssignments().isEmpty();
-            }
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == 1 &&
+                    sync.groupAssignments().isEmpty();
         }, syncGroupResponse(assigned, Errors.NONE));
 
         coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
@@ -854,14 +830,11 @@ public class ConsumerCoordinatorTest {
 
         // normal join group
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.data.memberId().equals(consumerId) &&
-                        sync.data.generationId() == 1 &&
-                        sync.groupAssignments().isEmpty();
-            }
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == 1 &&
+                    sync.groupAssignments().isEmpty();
         }, syncGroupResponse(assigned, Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(metadataResponse);
@@ -885,15 +858,12 @@ public class ConsumerCoordinatorTest {
         joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
         final AtomicBoolean received = new AtomicBoolean(false);
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                received.set(true);
-                LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
-                return leaveRequest.data().memberId().equals(consumerId) &&
-                        leaveRequest.data().groupId().equals(groupId);
-            }
-        }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
+        client.prepareResponse(body -> {
+            received.set(true);
+            LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+            return validateLeaveGroup(groupId, consumerId, leaveRequest);
+        }, new LeaveGroupResponse(
+            new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
         coordinator.close(time.timer(0));
         assertTrue(received.get());
     }
@@ -906,14 +876,10 @@ public class ConsumerCoordinatorTest {
         joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
         final AtomicBoolean received = new AtomicBoolean(false);
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                received.set(true);
-                LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
-                return leaveRequest.data().memberId().equals(consumerId) &&
-                        leaveRequest.data().groupId().equals(groupId);
-            }
+        client.prepareResponse(body -> {
+            received.set(true);
+            LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+            return validateLeaveGroup(groupId, consumerId, leaveRequest);
         }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
         coordinator.maybeLeaveGroup("test maybe leave group");
         assertTrue(received.get());
@@ -922,6 +888,15 @@ public class ConsumerCoordinatorTest {
         assertNull(generation);
     }
 
+    private boolean validateLeaveGroup(String groupId,
+                                       String consumerId,
+                                       LeaveGroupRequest leaveRequest) {
+        List<MemberIdentity> members = leaveRequest.data().members();
+        return leaveRequest.data().groupId().equals(groupId) &&
+                   members.size() == 1 &&
+                   members.get(0).memberId().equals(consumerId);
+    }
+
     /**
      * This test checks if a consumer that has a valid member ID but an invalid generation
      * ({@link org.apache.kafka.clients.consumer.internals.AbstractCoordinator.Generation#NO_GENERATION})
@@ -944,13 +919,10 @@ public class ConsumerCoordinatorTest {
         coordinator.joinGroupIfNeeded(time.timer(0));
 
         final AtomicBoolean received = new AtomicBoolean(false);
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                received.set(true);
-                LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
-                return leaveRequest.data().memberId().equals(consumerId);
-            }
+        client.prepareResponse(body -> {
+            received.set(true);
+            LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+            return validateLeaveGroup(groupId, consumerId, leaveRequest);
         }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
 
         coordinator.maybeLeaveGroup("pending member leaves");
@@ -986,12 +958,9 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_MEMBER_ID));
 
         // now we should see a new join with the empty UNKNOWN_MEMBER_ID
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                JoinGroupRequest joinRequest = (JoinGroupRequest) body;
-                return joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
-            }
+        client.prepareResponse(body -> {
+            JoinGroupRequest joinRequest = (JoinGroupRequest) body;
+            return joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
@@ -1038,12 +1007,9 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.ILLEGAL_GENERATION));
 
         // then let the full join/sync finish successfully
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                JoinGroupRequest joinRequest = (JoinGroupRequest) body;
-                return joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
-            }
+        client.prepareResponse(body -> {
+            JoinGroupRequest joinRequest = (JoinGroupRequest) body;
+            return joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
@@ -1106,22 +1072,19 @@ public class ConsumerCoordinatorTest {
         partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(tp1)));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                if (sync.data.memberId().equals(consumerId) &&
-                        sync.data.generationId() == 1 &&
-                        sync.groupAssignments().containsKey(consumerId)) {
-                    // trigger the metadata update including both topics after the sync group request has been sent
-                    Map<String, Integer> topicPartitionCounts = new HashMap<>();
-                    topicPartitionCounts.put(topic1, 1);
-                    topicPartitionCounts.put(topic2, 1);
-                    client.updateMetadata(TestUtils.metadataUpdateWith(1, topicPartitionCounts));
-                    return true;
-                }
-                return false;
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            if (sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == 1 &&
+                    sync.groupAssignments().containsKey(consumerId)) {
+                // trigger the metadata update including both topics after the sync group request has been sent
+                Map<String, Integer> topicPartitionCounts = new HashMap<>();
+                topicPartitionCounts.put(topic1, 1);
+                topicPartitionCounts.put(topic2, 1);
+                client.updateMetadata(TestUtils.metadataUpdateWith(1, topicPartitionCounts));
+                return true;
             }
+            return false;
         }, syncGroupResponse(Collections.singletonList(tp1), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -1581,13 +1544,10 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(singleton(t1p));
 
         // the client should not reuse generation/memberId from auto-subscribed generation
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
-                return commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
-                        commitRequest.data().generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
-            }
+        client.prepareResponse(body -> {
+            OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
+            return commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
+                    commitRequest.data().generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
         }, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
 
         AtomicBoolean success = new AtomicBoolean(false);
@@ -2302,21 +2262,15 @@ public class ConsumerCoordinatorTest {
     private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLeaveGroup) throws Exception {
         final AtomicBoolean commitRequested = new AtomicBoolean();
         final AtomicBoolean leaveGroupRequested = new AtomicBoolean();
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                commitRequested.set(true);
-                OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
-                return commitRequest.data().groupId().equals(groupId);
-            }
+        client.prepareResponse(body -> {
+            commitRequested.set(true);
+            OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
+            return commitRequest.data().groupId().equals(groupId);
         }, new OffsetCommitResponse(new OffsetCommitResponseData()));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                leaveGroupRequested.set(true);
-                LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
-                return leaveRequest.data().groupId().equals(groupId);
-            }
+        client.prepareResponse(body -> {
+            leaveGroupRequested.set(true);
+            LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+            return leaveRequest.data().groupId().equals(groupId);
         }, new LeaveGroupResponse(new LeaveGroupResponseData()
                 .setErrorCode(Errors.NONE.code())));
 
@@ -2494,37 +2448,31 @@ public class ConsumerCoordinatorTest {
     }
 
     private MockClient.RequestMatcher offsetCommitRequestMatcher(final Map<TopicPartition, Long> expectedOffsets) {
-        return new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                OffsetCommitRequest req = (OffsetCommitRequest) body;
-                Map<TopicPartition, Long> offsets = req.offsets();
-                if (offsets.size() != expectedOffsets.size())
-                    return false;
+        return body -> {
+            OffsetCommitRequest req = (OffsetCommitRequest) body;
+            Map<TopicPartition, Long> offsets = req.offsets();
+            if (offsets.size() != expectedOffsets.size())
+                return false;
 
-                for (Map.Entry<TopicPartition, Long> expectedOffset : expectedOffsets.entrySet()) {
-                    if (!offsets.containsKey(expectedOffset.getKey())) {
+            for (Map.Entry<TopicPartition, Long> expectedOffset : expectedOffsets.entrySet()) {
+                if (!offsets.containsKey(expectedOffset.getKey())) {
+                    return false;
+                } else {
+                    Long actualOffset = offsets.get(expectedOffset.getKey());
+                    if (!actualOffset.equals(expectedOffset.getValue())) {
                         return false;
-                    } else {
-                        Long actualOffset = offsets.get(expectedOffset.getKey());
-                        if (!actualOffset.equals(expectedOffset.getValue())) {
-                            return false;
-                        }
                     }
                 }
-                return true;
             }
+            return true;
         };
     }
 
     private OffsetCommitCallback callback(final Map<TopicPartition, OffsetAndMetadata> expectedOffsets,
                                           final AtomicBoolean success) {
-        return new OffsetCommitCallback() {
-            @Override
-            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-                if (expectedOffsets.equals(offsets) && exception == null)
-                    success.set(true);
-            }
+        return (offsets, exception) -> {
+            if (expectedOffsets.equals(offsets) && exception == null)
+                success.set(true);
         };
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 5213ec2..58c650a 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -21,8 +21,10 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Message;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.BoundField;
@@ -48,6 +50,10 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public final class MessageTest {
+
+    private final String memberId = "memberId";
+    private final String instanceId = "instanceId";
+
     @Rule
     final public Timeout globalTimeout = Timeout.millis(120000);
 
@@ -115,7 +121,7 @@ public final class MessageTest {
     public void testHeartbeatVersions() throws Exception {
         Supplier<HeartbeatRequestData> newRequest = () -> new HeartbeatRequestData()
                 .setGroupId("groupId")
-                .setMemberId("memberId")
+                .setMemberId(memberId)
                 .setGenerationId(15);
         testAllMessageRoundTrips(newRequest.get());
         testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
@@ -126,7 +132,7 @@ public final class MessageTest {
     public void testJoinGroupRequestVersions() throws Exception {
         Supplier<JoinGroupRequestData> newRequest = () -> new JoinGroupRequestData()
                 .setGroupId("groupId")
-                .setMemberId("memberId")
+                .setMemberId(memberId)
                 .setProtocolType("consumer")
                 .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection())
                 .setSessionTimeoutMs(10000);
@@ -138,7 +144,6 @@ public final class MessageTest {
 
     @Test
     public void testJoinGroupResponseVersions() throws Exception {
-        String memberId = "memberId";
         Supplier<JoinGroupResponseData> newResponse = () -> new JoinGroupResponseData()
                 .setMemberId(memberId)
                 .setLeader(memberId)
@@ -154,15 +159,30 @@ public final class MessageTest {
     }
 
     @Test
+    public void testLeaveGroupResponseVersions() throws Exception {
+        Supplier<LeaveGroupResponseData> newResponse = () -> new LeaveGroupResponseData()
+                                                                 .setErrorCode(Errors.NOT_COORDINATOR.code());
+
+        testAllMessageRoundTrips(newResponse.get());
+        testAllMessageRoundTripsFromVersion((short) 1, newResponse.get().setThrottleTimeMs(1000));
+
+        testAllMessageRoundTripsFromVersion((short) 3, newResponse.get().setMembers(
+            Collections.singletonList(new MemberResponse()
+            .setMemberId(memberId)
+            .setGroupInstanceId(instanceId))
+        ));
+    }
+
+    @Test
     public void testSyncGroupDefaultGroupInstanceId() throws Exception {
         Supplier<SyncGroupRequestData> request = () -> new SyncGroupRequestData()
                 .setGroupId("groupId")
-                .setMemberId("memberId")
+                .setMemberId(memberId)
                 .setGenerationId(15)
                 .setAssignments(new ArrayList<>());
         testAllMessageRoundTrips(request.get());
         testAllMessageRoundTrips(request.get().setGroupInstanceId(null));
-        testAllMessageRoundTripsFromVersion((short) 3, request.get().setGroupInstanceId("instanceId"));
+        testAllMessageRoundTripsFromVersion((short) 3, request.get().setGroupInstanceId(instanceId));
     }
 
     @Test
@@ -173,12 +193,12 @@ public final class MessageTest {
 
         Supplier<OffsetCommitRequestData> request = () -> new OffsetCommitRequestData()
                 .setGroupId("groupId")
-                .setMemberId("memberId")
+                .setMemberId(memberId)
                 .setTopics(new ArrayList<>())
                 .setGenerationId(15);
         testAllMessageRoundTripsFromVersion((short) 1, request.get());
         testAllMessageRoundTripsFromVersion((short) 1, request.get().setGroupInstanceId(null));
-        testAllMessageRoundTripsFromVersion((short) 7, request.get().setGroupInstanceId("instanceId"));
+        testAllMessageRoundTripsFromVersion((short) 7, request.get().setGroupInstanceId(instanceId));
     }
 
     @Test
@@ -317,7 +337,7 @@ public final class MessageTest {
      * Test that the JSON response files match the schemas accessible through the ApiKey class.
      */
     @Test
-    public void testResponseSchemas() throws Exception {
+    public void testResponseSchemas() {
         for (ApiKeys apiKey : ApiKeys.values()) {
             Schema[] manualSchemas = apiKey.responseSchemas;
             Schema[] generatedSchemas = ApiMessageType.fromApiKey(apiKey.id).responseSchemas();
@@ -439,17 +459,17 @@ public final class MessageTest {
         verifySizeRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData()
                 .setGroupId("groupId")
                 .setGenerationId(15)
-                .setMemberId("memberId")
-                .setGroupInstanceId("instanceId"));
+                .setMemberId(memberId)
+                .setGroupInstanceId(instanceId));
         verifySizeSucceeds((short) 0, new HeartbeatRequestData()
                 .setGroupId("groupId")
                 .setGenerationId(15)
-                .setMemberId("memberId")
+                .setMemberId(memberId)
                 .setGroupInstanceId(null));
         verifySizeSucceeds((short) 0, new HeartbeatRequestData()
                 .setGroupId("groupId")
                 .setGenerationId(15)
-                .setMemberId("memberId"));
+                .setMemberId(memberId));
     }
 
     private void verifySizeRaisesUve(short version, String problemFieldName,
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java
index 9fa9d3b..2ff928b 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java
@@ -16,34 +16,97 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class LeaveGroupRequestTest {
 
+    private final String groupId = "group_id";
+    private final String memberIdOne = "member_1";
+    private final String instanceIdOne = "instance_1";
+    private final String memberIdTwo = "member_2";
+    private final String instanceIdTwo = "instance_2";
+
+    private final int throttleTimeMs = 10;
+
+    private LeaveGroupRequest.Builder builder;
+    private List<MemberIdentity> members;
+
+    @Before
+    public void setUp() {
+        members = Arrays.asList(new MemberIdentity()
+                                         .setMemberId(memberIdOne)
+                                         .setGroupInstanceId(instanceIdOne),
+                                new MemberIdentity()
+                                         .setMemberId(memberIdTwo)
+                                         .setGroupInstanceId(instanceIdTwo));
+        builder = new LeaveGroupRequest.Builder(
+            groupId,
+            members
+        );
+    }
+
     @Test
-    public void testLeaveConstructor() {
-        final String groupId = "group_id";
-        final String memberId = "member_id";
-        final int throttleTimeMs = 10;
+    public void testMultiLeaveConstructor() {
+        final LeaveGroupRequestData expectedData = new LeaveGroupRequestData()
+                                                       .setGroupId(groupId)
+                                                       .setMembers(members);
+
+        for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
+            try {
+                LeaveGroupRequest request = builder.build(version);
+                if (version <= 2) {
+                    fail("Older version " + version +
+                             " request data should not be created due to non-single members");
+                }
+                assertEquals(expectedData, request.data());
+                assertEquals(members, request.members());
 
+                LeaveGroupResponse expectedResponse = new LeaveGroupResponse(
+                    Collections.emptyList(),
+                    Errors.COORDINATOR_LOAD_IN_PROGRESS,
+                    throttleTimeMs,
+                    version
+                );
+
+                assertEquals(expectedResponse, request.getErrorResponse(throttleTimeMs,
+                                                                        Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+            } catch (UnsupportedVersionException e) {
+                assertTrue(e.getMessage().contains("leave group request only supports single member instance"));
+            }
+        }
+
+    }
+
+    @Test
+    public void testSingleLeaveConstructor() {
         final LeaveGroupRequestData expectedData = new LeaveGroupRequestData()
                                                        .setGroupId(groupId)
-                                                       .setMemberId(memberId);
+                                                       .setMemberId(memberIdOne);
+        List<MemberIdentity> singleMember = Collections.singletonList(
+            new MemberIdentity()
+                .setMemberId(memberIdOne));
 
-        final LeaveGroupRequest.Builder builder =
-            new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
-                                              .setGroupId(groupId)
-                                              .setMemberId(memberId));
+        builder = new LeaveGroupRequest.Builder(groupId, singleMember);
 
-        for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
+        for (short version = 0; version <= 2; version++) {
             LeaveGroupRequest request = builder.build(version);
             assertEquals(expectedData, request.data());
+            assertEquals(singleMember, request.members());
 
             int expectedThrottleTime = version >= 1 ? throttleTimeMs
                                            : AbstractResponse.DEFAULT_THROTTLE_TIME;
@@ -57,4 +120,9 @@ public class LeaveGroupRequestTest {
                                                                     Errors.NOT_CONTROLLER.exception()));
         }
     }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBuildEmptyMembers() {
+        new LeaveGroupRequest.Builder(groupId, Collections.emptyList());
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
index a1368b5..187640b 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
@@ -17,11 +17,16 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
@@ -30,10 +35,31 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class LeaveGroupResponseTest {
+
+    private final String memberIdOne = "member_1";
+    private final String instanceIdOne = "instance_1";
+    private final String memberIdTwo = "member_2";
+    private final String instanceIdTwo = "instance_2";
+
     private final int throttleTimeMs = 10;
 
+    private List<MemberResponse> memberResponses;
+
+    @Before
+    public void setUp() {
+        memberResponses = Arrays.asList(new MemberResponse()
+                                            .setMemberId(memberIdOne)
+                                            .setGroupInstanceId(instanceIdOne)
+                                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+                                        new MemberResponse()
+                                            .setMemberId(memberIdTwo)
+                                            .setGroupInstanceId(instanceIdTwo)
+                                            .setErrorCode(Errors.FENCED_INSTANCE_ID.code())
+        );
+    }
+
     @Test
-    public void testConstructor() {
+    public void testConstructorWithStruct() {
         Map<Errors, Integer> expectedErrorCounts = Collections.singletonMap(Errors.NOT_COORDINATOR, 1);
 
         LeaveGroupResponseData responseData = new LeaveGroupResponseData()
@@ -54,9 +80,40 @@ public class LeaveGroupResponseTest {
         }
     }
 
+
+    @Test
+    public void testConstructorWithMemberResponses() {
+        Map<Errors, Integer> expectedErrorCounts = new HashMap<>();
+        expectedErrorCounts.put(Errors.UNKNOWN_MEMBER_ID, 1);
+        expectedErrorCounts.put(Errors.FENCED_INSTANCE_ID, 1);
+
+        for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
+            LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(memberResponses,
+                                                                           Errors.NONE,
+                                                                           throttleTimeMs,
+                                                                           version);
+
+            if (version >= 3) {
+                assertEquals(expectedErrorCounts, leaveGroupResponse.errorCounts());
+                assertEquals(memberResponses, leaveGroupResponse.memberResponses());
+            } else {
+                assertEquals(Collections.singletonMap(Errors.UNKNOWN_MEMBER_ID, 1),
+                             leaveGroupResponse.errorCounts());
+                assertEquals(Collections.emptyList(), leaveGroupResponse.memberResponses());
+            }
+
+            if (version >= 1) {
+                assertEquals(throttleTimeMs, leaveGroupResponse.throttleTimeMs());
+            } else {
+                assertEquals(DEFAULT_THROTTLE_TIME, leaveGroupResponse.throttleTimeMs());
+            }
+
+            assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResponse.error());
+        }
+    }
+
     @Test
     public void testShouldThrottle() {
-        // A dummy setup is ok.
         LeaveGroupResponse response = new LeaveGroupResponse(new LeaveGroupResponseData());
         for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
             if (version >= 2) {
@@ -68,7 +125,7 @@ public class LeaveGroupResponseTest {
     }
 
     @Test
-    public void testEquality() {
+    public void testEqualityWithStruct() {
         LeaveGroupResponseData responseData = new LeaveGroupResponseData()
             .setErrorCode(Errors.NONE.code())
             .setThrottleTimeMs(throttleTimeMs);
@@ -80,6 +137,29 @@ public class LeaveGroupResponseTest {
             assertEquals(primaryResponse, primaryResponse);
             assertEquals(primaryResponse, secondaryResponse);
             assertEquals(primaryResponse.hashCode(), secondaryResponse.hashCode());
+
+        }
+    }
+
+    @Test
+    public void testEqualityWithMemberResponses() {
+        for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
+            List<MemberResponse> localResponses = version > 2 ? memberResponses : memberResponses.subList(0, 1);
+            LeaveGroupResponse primaryResponse = new LeaveGroupResponse(localResponses,
+                                                                        Errors.NONE,
+                                                                        throttleTimeMs,
+                                                                        version);
+
+            // The order of members should not alter result data.
+            Collections.reverse(localResponses);
+            LeaveGroupResponse reversedResponse = new LeaveGroupResponse(localResponses,
+                                                                         Errors.NONE,
+                                                                         throttleTimeMs,
+                                                                         version);
+
+            assertEquals(primaryResponse, primaryResponse);
+            assertEquals(primaryResponse, reversedResponse);
+            assertEquals(primaryResponse.hashCode(), reversedResponse.hashCode());
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 10da9f5..9855210 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
@@ -69,7 +70,7 @@ import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
-import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
@@ -125,7 +126,6 @@ import java.util.Optional;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.apache.kafka.test.TestUtils.toBuffer;
 import static org.junit.Assert.assertEquals;
@@ -184,14 +184,14 @@ public class RequestResponseTest {
         checkErrorResponse(createListOffsetRequest(2), new UnknownServerException(), true);
         checkResponse(createListOffsetResponse(2), 2, true);
         checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true);
-        checkRequest(createMetadataRequest(1, singletonList("topic1")), true);
-        checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException(), true);
+        checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true);
+        checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
         checkResponse(createMetadataResponse(), 2, true);
-        checkErrorResponse(createMetadataRequest(2, singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), new UnknownServerException(), true);
         checkResponse(createMetadataResponse(), 3, true);
-        checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), new UnknownServerException(), true);
         checkResponse(createMetadataResponse(), 4, true);
-        checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), new UnknownServerException(), true);
         checkRequest(OffsetFetchRequest.forAllPartitions("group1"), true);
         checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorException("Not Coordinator"), true);
         checkRequest(createOffsetFetchRequest(0), true);
@@ -262,7 +262,7 @@ public class RequestResponseTest {
         checkOlderFetchVersions();
         checkResponse(createMetadataResponse(), 0, true);
         checkResponse(createMetadataResponse(), 1, true);
-        checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
         checkRequest(createOffsetCommitRequest(0), true);
         checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException(), true);
         checkRequest(createOffsetCommitRequest(1), true);
@@ -723,8 +723,7 @@ public class RequestResponseTest {
     public void testOffsetFetchRequestBuilderToString() {
         String allTopicPartitionsString = OffsetFetchRequest.Builder.allTopicPartitions("someGroup").toString();
         assertTrue(allTopicPartitionsString.contains("<ALL>"));
-        String string = new OffsetFetchRequest.Builder("group1",
-                singletonList(new TopicPartition("test11", 1))).toString();
+        String string = new OffsetFetchRequest.Builder("group1", Collections.singletonList(new TopicPartition("test11", 1))).toString();
         assertTrue(string.contains("test11"));
         assertTrue(string.contains("group1"));
     }
@@ -896,14 +895,22 @@ public class RequestResponseTest {
         DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
         DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId", null,
                 clientId, clientHost, new byte[0], new byte[0]);
-        DescribeGroupsResponseData.DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group", Errors.NONE,
-                "STABLE", "consumer", "roundrobin", asList(member), Collections.emptySet());
+        DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group",
+                                                                       Errors.NONE,
+                                                                       "STABLE",
+                                                                       "consumer",
+                                                                       "roundrobin",
+                                                                       Collections.singletonList(member),
+                                                                       Collections.emptySet());
         describeGroupsResponseData.groups().add(metadata);
         return new DescribeGroupsResponse(describeGroupsResponseData);
     }
 
     private LeaveGroupRequest createLeaveGroupRequest() {
-        return new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId("group1").setMemberId("consumer1")).build();
+        return new LeaveGroupRequest.Builder(
+            "group1", Collections.singletonList(new MemberIdentity()
+                                                    .setMemberId("consumer1"))
+            ).build();
     }
 
     private LeaveGroupResponse createLeaveGroupResponse() {
@@ -1040,7 +1047,7 @@ public class RequestResponseTest {
     }
 
     private OffsetFetchRequest createOffsetFetchRequest(int version) {
-        return new OffsetFetchRequest.Builder("group1", singletonList(new TopicPartition("test11", 1)))
+        return new OffsetFetchRequest.Builder("group1", Collections.singletonList(new TopicPartition("test11", 1)))
                 .build((short) version);
     }
 
@@ -1188,7 +1195,7 @@ public class RequestResponseTest {
     private SaslHandshakeResponse createSaslHandshakeResponse() {
         return new SaslHandshakeResponse(
                 new SaslHandshakeResponseData()
-                .setErrorCode(Errors.NONE.code()).setMechanisms(singletonList("GSSAPI")));
+                .setErrorCode(Errors.NONE.code()).setMechanisms(Collections.singletonList("GSSAPI")));
     }
 
     private SaslAuthenticateRequest createSaslAuthenticateRequest() {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 7f6641c..6a57d59 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -28,6 +28,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
 import org.apache.kafka.common.requests._
@@ -252,7 +253,7 @@ class GroupCoordinator(val brokerId: Int,
       } else if (group.isPendingMember(memberId)) {
         // A rejoining pending member will be accepted. Note that pending member will never be a static member.
         if (groupInstanceId.isDefined) {
-          throw new IllegalStateException(s"the static member $groupInstanceId was unexpectedly to be assigned " +
+          throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +
             s"into pending member bucket with member id $memberId")
         } else {
           addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
@@ -419,36 +420,58 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit): Unit = {
-    validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error =>
-      responseCallback(error)
-      return
-    }
-
-    groupManager.getGroup(groupId) match {
+  def handleLeaveGroup(groupId: String,
+                       leavingMembers: List[MemberIdentity],
+                       responseCallback: LeaveGroupResult => Unit) {
+    validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP) match {
+      case Some(error) =>
+        responseCallback(leaveError(error, List.empty))
       case None =>
-        responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-      case Some(group) =>
-        group.inLock {
-          if (group.is(Dead)) {
-            responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
-          } else if (group.isPendingMember(memberId)) {
-            // if a pending member is leaving, it needs to be removed from the pending list, heartbeat cancelled
-            // and if necessary, prompt a JoinGroup completion.
-            info(s"Pending member $memberId is leaving group ${group.groupId}.")
-            removePendingMemberAndUpdateGroup(group, memberId)
-            heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
-            responseCallback(Errors.NONE)
-          } else if (!group.has(memberId)) {
-            responseCallback(Errors.UNKNOWN_MEMBER_ID)
-          } else {
-            val member = group.get(memberId)
-            removeHeartbeatForLeavingMember(group, member)
-            info(s"Member ${member.memberId} in group ${group.groupId} has left, removing it from the group")
-            removeMemberAndUpdateGroup(group, member, s"removing member $memberId on LeaveGroup")
-            responseCallback(Errors.NONE)
-          }
+        groupManager.getGroup(groupId) match {
+          case None =>
+            responseCallback(leaveError(Errors.NONE, leavingMembers.map {leavingMember =>
+              memberLeaveError(leavingMember, Errors.UNKNOWN_MEMBER_ID)
+            }))
+          case Some(group) =>
+            group.inLock {
+              if (group.is(Dead)) {
+                responseCallback(leaveError(Errors.COORDINATOR_NOT_AVAILABLE, List.empty))
+              } else {
+                val memberErrors = leavingMembers.map { leavingMember =>
+                  val memberId = leavingMember.memberId
+                  val groupInstanceId = Option(leavingMember.groupInstanceId)
+                  if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
+                    && group.isStaticMemberFenced(memberId, groupInstanceId)) {
+                    memberLeaveError(leavingMember, Errors.FENCED_INSTANCE_ID)
+                  } else if (group.isPendingMember(memberId)) {
+                    if (groupInstanceId.isDefined) {
+                      throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be leaving " +
+                        s"from pending member bucket with member id $memberId")
+                    } else {
+                      // if a pending member is leaving, it needs to be removed from the pending list, heartbeat cancelled
+                      // and if necessary, prompt a JoinGroup completion.
+                      info(s"Pending member $memberId is leaving group ${group.groupId}.")
+                      removePendingMemberAndUpdateGroup(group, memberId)
+                      heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
+                      memberLeaveError(leavingMember, Errors.NONE)
+                    }
+                  } else if (!group.has(memberId) && !group.hasStaticMember(groupInstanceId)) {
+                    memberLeaveError(leavingMember, Errors.UNKNOWN_MEMBER_ID)
+                  } else {
+                    val member = if (group.hasStaticMember(groupInstanceId))
+                      group.get(group.getStaticMemberId(groupInstanceId))
+                    else
+                      group.get(memberId)
+                    removeHeartbeatForLeavingMember(group, member)
+                    info(s"Member[group.instance.id ${member.groupInstanceId}, member.id ${member.memberId}] " +
+                      s"in group ${group.groupId} has left, removing it from the group")
+                    removeMemberAndUpdateGroup(group, member, s"removing member $memberId on LeaveGroup")
+                    memberLeaveError(leavingMember, Errors.NONE)
+                  }
+                }
+                responseCallback(leaveError(Errors.NONE, memberErrors))
+              }
+            }
         }
     }
   }
@@ -1106,6 +1129,21 @@ object GroupCoordinator {
       leaderId = GroupCoordinator.NoLeader,
       error = error)
   }
+
+  private def memberLeaveError(memberIdentity: MemberIdentity,
+                               error: Errors): LeaveMemberResponse = {
+    LeaveMemberResponse(
+      memberId = memberIdentity.memberId,
+      groupInstanceId = Option(memberIdentity.groupInstanceId),
+      error = error)
+  }
+
+  private def leaveError(topLevelError: Errors,
+                         memberResponses: List[LeaveMemberResponse]): LeaveGroupResult = {
+    LeaveGroupResult(
+      topLevelError = topLevelError,
+      memberResponses = memberResponses)
+  }
 }
 
 case class GroupConfig(groupMinSessionTimeoutMs: Int,
@@ -1122,3 +1160,10 @@ case class JoinGroupResult(members: List[JoinGroupResponseMember],
 
 case class SyncGroupResult(memberAssignment: Array[Byte],
                            error: Errors)
+
+case class LeaveMemberResponse(memberId: String,
+                               groupInstanceId: Option[String],
+                               error: Errors)
+
+case class LeaveGroupResult(topLevelError: Errors,
+                            memberResponses : List[LeaveMemberResponse])
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cec7470..ecd6c94 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,7 +31,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
 import kafka.controller.KafkaController
-import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, SyncGroupResult}
+import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, LeaveGroupResult, SyncGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.message.ZStdCompressionCodec
 import kafka.network.RequestChannel
@@ -62,6 +62,7 @@ import org.apache.kafka.common.message.HeartbeatResponseData
 import org.apache.kafka.common.message.InitProducerIdResponseData
 import org.apache.kafka.common.message.JoinGroupResponseData
 import org.apache.kafka.common.message.LeaveGroupResponseData
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
 import org.apache.kafka.common.message.ListGroupsResponseData
 import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.apache.kafka.common.message.OffsetCommitResponseData
@@ -1514,9 +1515,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(error: Errors) {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val response = new HeartbeatResponse(
-            new HeartbeatResponseData()
-              .setThrottleTimeMs(requestThrottleMs)
-              .setErrorCode(error.code))
+          new HeartbeatResponseData()
+            .setThrottleTimeMs(requestThrottleMs)
+            .setErrorCode(error.code))
         trace("Sending heartbeat response %s for correlation id %d to client %s."
           .format(response, request.header.correlationId, request.header.clientId))
         response
@@ -1549,29 +1550,37 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleLeaveGroupRequest(request: RequestChannel.Request) {
     val leaveGroupRequest = request.body[LeaveGroupRequest]
 
-    // the callback for sending a leave-group response
-    def sendResponseCallback(error: Errors) {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new LeaveGroupResponse(new LeaveGroupResponseData()
-          .setThrottleTimeMs(requestThrottleMs)
-          .setErrorCode(error.code()))
-        trace("Sending leave group response %s for correlation id %d to client %s."
-          .format(response, request.header.correlationId, request.header.clientId))
-        response
-      }
-      sendResponseMaybeThrottle(request, createResponse)
-    }
+    val members = leaveGroupRequest.members().asScala.toList
 
-    if (!authorize(request.session, Read, Resource(Group, leaveGroupRequest.data().groupId(), LITERAL))) {
-      sendResponseMaybeThrottle(request, requestThrottleMs =>
+    if (!authorize(request.session, Read, Resource(Group, leaveGroupRequest.data.groupId, LITERAL))) {
+      sendResponseMaybeThrottle(request, requestThrottleMs => {
         new LeaveGroupResponse(new LeaveGroupResponseData()
           .setThrottleTimeMs(requestThrottleMs)
-          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())))
+          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+        )
+      })
     } else {
-      // let the coordinator to handle leave-group
+      def sendResponseCallback(leaveGroupResult : LeaveGroupResult) {
+        val memberResponses = leaveGroupResult.memberResponses.map(
+          leaveGroupResult =>
+            new MemberResponse()
+              .setErrorCode(leaveGroupResult.error.code)
+              .setMemberId(leaveGroupResult.memberId)
+              .setGroupInstanceId(leaveGroupResult.groupInstanceId.orNull)
+        )
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          new LeaveGroupResponse(
+            memberResponses.asJava,
+            leaveGroupResult.topLevelError,
+            requestThrottleMs,
+            leaveGroupRequest.version)
+        }
+        sendResponseMaybeThrottle(request, createResponse)
+      }
+
       groupCoordinator.handleLeaveGroup(
-        leaveGroupRequest.data().groupId(),
-        leaveGroupRequest.data().memberId(),
+        leaveGroupRequest.data.groupId,
+        members,
         sendResponseCallback)
     }
   }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 41bfb0d..cb5f1aa 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
 import org.apache.kafka.common.message.JoinGroupRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
-import org.apache.kafka.common.message.LeaveGroupRequestData
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.apache.kafka.common.message.SyncGroupRequestData
 import org.apache.kafka.common.network.ListenerName
@@ -398,9 +398,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build()
 
   private def leaveGroupRequest = new LeaveGroupRequest.Builder(
-    new LeaveGroupRequestData()
-      .setGroupId(group)
-      .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build()
+    group, Collections.singletonList(
+      new MemberIdentity()
+        .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+    )).build()
 
   private def deleteGroupsRequest = new DeleteGroupsRequest.Builder(
     new DeleteGroupsRequestData()
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 1cee665..b85035f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -26,6 +26,7 @@ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.JoinGroupRequest
 import org.apache.kafka.common.utils.Time
@@ -153,8 +154,8 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
   }
 
 
-  class JoinGroupOperation extends GroupOperation[JoinGroupResult, JoinGroupCallback] {
-    override def responseCallback(responsePromise: Promise[JoinGroupResult]): JoinGroupCallback = {
+  class JoinGroupOperation extends GroupOperation[JoinGroupCallbackParams, JoinGroupCallback] {
+    override def responseCallback(responsePromise: Promise[JoinGroupCallbackParams]): JoinGroupCallback = {
       val callback: JoinGroupCallback = responsePromise.success(_)
       callback
     }
@@ -263,21 +264,28 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
 
   class LeaveGroupOperation extends GroupOperation[LeaveGroupCallbackParams, LeaveGroupCallback] {
     override def responseCallback(responsePromise: Promise[LeaveGroupCallbackParams]): LeaveGroupCallback = {
-      val callback: LeaveGroupCallback = error => responsePromise.success(error)
+      val callback: LeaveGroupCallback = result => responsePromise.success(result)
       callback
     }
     override def runWithCallback(member: GroupMember, responseCallback: LeaveGroupCallback): Unit = {
-      groupCoordinator.handleLeaveGroup(member.group.groupId, member.memberId, responseCallback)
+      val memberIdentity = new MemberIdentity()
+          .setMemberId(member.memberId)
+      groupCoordinator.handleLeaveGroup(member.group.groupId, List(memberIdentity), responseCallback)
     }
     override def awaitAndVerify(member: GroupMember): Unit = {
-       val error = await(member, DefaultSessionTimeout)
-       assertEquals(Errors.NONE, error)
+      val leaveGroupResult = await(member, DefaultSessionTimeout)
+
+      val memberResponses = leaveGroupResult.memberResponses
+      GroupCoordinatorTest.verifyLeaveGroupResult(leaveGroupResult, Errors.NONE, List(Errors.NONE))
+      assertEquals(member.memberId, memberResponses.head.memberId)
+      assertEquals(None, memberResponses.head.groupInstanceId)
     }
   }
 }
 
 object GroupCoordinatorConcurrencyTest {
 
+  type JoinGroupCallbackParams = JoinGroupResult
   type JoinGroupCallback = JoinGroupResult => Unit
   type SyncGroupCallbackParams = (Array[Byte], Errors)
   type SyncGroupCallback = SyncGroupResult => Unit
@@ -285,8 +293,8 @@ object GroupCoordinatorConcurrencyTest {
   type HeartbeatCallback = Errors => Unit
   type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
   type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
-  type LeaveGroupCallbackParams = Errors
-  type LeaveGroupCallback = Errors => Unit
+  type LeaveGroupCallbackParams = LeaveGroupResult
+  type LeaveGroupCallback = LeaveGroupResult => Unit
   type CompleteTxnCallbackParams = Errors
   type CompleteTxnCallback = Errors => Unit
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index d72bfc8..cdf1518 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
 import java.util.Optional
 
 import kafka.common.OffsetAndMetadata
-import kafka.server.{DelayedOperationPurgatory, KafkaConfig, HostedPartition, ReplicaManager}
+import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager}
 import kafka.utils._
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.cluster.Partition
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 import org.scalatest.Assertions.intercept
@@ -45,6 +46,8 @@ import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise, TimeoutException}
 
 class GroupCoordinatorTest {
+  import GroupCoordinatorTest._
+
   type JoinGroupCallback = JoinGroupResult => Unit
   type SyncGroupCallbackParams = (Array[Byte], Errors)
   type SyncGroupCallback = SyncGroupResult => Unit
@@ -52,8 +55,7 @@ class GroupCoordinatorTest {
   type HeartbeatCallback = Errors => Unit
   type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
   type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
-  type LeaveGroupCallbackParams = Errors
-  type LeaveGroupCallback = Errors => Unit
+  type LeaveGroupCallback = LeaveGroupResult => Unit
 
   val ClientId = "consumer-test"
   val ClientHost = "localhost"
@@ -951,6 +953,24 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def staticMemberLeaveWithIllegalStateAsPendingMember() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+    val group = groupCoordinator.groupManager.getGroup(groupId).get
+    group.addPendingMember(rebalanceResult.followerId)
+    group.remove(rebalanceResult.followerId)
+    EasyMock.reset(replicaManager)
+
+    // Illegal state exception shall trigger since follower id resides in pending member bucket.
+    val expectedException = intercept[IllegalStateException] {
+      singleLeaveGroup(groupId, rebalanceResult.followerId, followerInstanceId)
+    }
+
+    val message = expectedException.getMessage
+    assertTrue(message.contains(rebalanceResult.followerId))
+    assertTrue(message.contains(followerInstanceId.get))
+  }
+
+  @Test
   def staticMemberReJoinWithIllegalStateAsUnknownMember() {
     staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
     val group = groupCoordinator.groupManager.getGroup(groupId).get
@@ -1036,8 +1056,8 @@ class GroupCoordinatorTest {
 
     // Send a special leave group request from static follower, moving group towards PreparingRebalance
     EasyMock.reset(replicaManager)
-    val followerLeaveGroupResult = leaveGroup(groupId, rebalanceResult.followerId)
-    assertEquals(Errors.NONE, followerLeaveGroupResult)
+    val followerLeaveGroupResults = singleLeaveGroup(groupId, rebalanceResult.followerId)
+    verifyLeaveGroupResult(followerLeaveGroupResults)
     assertGroupState(groupState = PreparingRebalance)
 
     timer.advanceClock(DefaultRebalanceTimeout + 1)
@@ -1791,8 +1811,8 @@ class GroupCoordinatorTest {
     val pending = setupGroupWithPendingMember()
 
     EasyMock.reset(replicaManager)
-    val leaveGroupResult = leaveGroup(groupId, pending.memberId)
-    assertEquals(Errors.NONE, leaveGroupResult)
+    val leaveGroupResults = singleLeaveGroup(groupId, pending.memberId)
+    verifyLeaveGroupResult(leaveGroupResults)
 
     assertGroupState(groupState = CompletingRebalance)
     assertEquals(2, group().allMembers.size)
@@ -2004,8 +2024,8 @@ class GroupCoordinatorTest {
 
     // and leaves.
     EasyMock.reset(replicaManager)
-    val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
-    assertEquals(Errors.NONE, leaveGroupResult)
+    val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
+    verifyLeaveGroupResult(leaveGroupResults)
 
     // The simple offset commit should now fail
     EasyMock.reset(replicaManager)
@@ -2436,25 +2456,14 @@ class GroupCoordinatorTest {
   def testLeaveGroupWrongCoordinator() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val leaveGroupResult = leaveGroup(otherGroupId, memberId)
-    assertEquals(Errors.NOT_COORDINATOR, leaveGroupResult)
+    val leaveGroupResults = singleLeaveGroup(otherGroupId, memberId)
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NOT_COORDINATOR)
   }
 
   @Test
   def testLeaveGroupUnknownGroup() {
-    val leaveGroupResult = leaveGroup(groupId, memberId)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
-  }
-
-  @Test
-  def testLeaveDeadGroup() {
-    val memberId = "memberId"
-
-    val deadGroupId = "deadGroupId"
-
-    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
-    val leaveGroupResult = leaveGroup(deadGroupId, memberId)
-    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, leaveGroupResult)
+    val leaveGroupResults = singleLeaveGroup(groupId, memberId)
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID))
   }
 
   @Test
@@ -2467,8 +2476,27 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val leaveGroupResult = leaveGroup(groupId, otherMemberId)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
+    val leaveGroupResults = singleLeaveGroup(groupId, otherMemberId)
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID))
+  }
+
+  @Test
+  def testSingleLeaveDeadGroup() {
+    val deadGroupId = "deadGroupId"
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+    val leaveGroupResults = singleLeaveGroup(deadGroupId, memberId)
+    verifyLeaveGroupResult(leaveGroupResults, Errors.COORDINATOR_NOT_AVAILABLE)
+  }
+
+  @Test
+  def testBatchLeaveDeadGroup() {
+    val deadGroupId = "deadGroupId"
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+    val leaveGroupResults = batchLeaveGroup(deadGroupId,
+      List(new MemberIdentity().setMemberId(memberId), new MemberIdentity().setMemberId(memberId)))
+    verifyLeaveGroupResult(leaveGroupResults, Errors.COORDINATOR_NOT_AVAILABLE)
   }
 
   @Test
@@ -2481,8 +2509,116 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
-    assertEquals(Errors.NONE, leaveGroupResult)
+    val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
+    verifyLeaveGroupResult(leaveGroupResults)
+  }
+
+  @Test
+  def testLeaveGroupWithFencedInstanceId() {
+    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResults = singleLeaveGroup(groupId, "some_member", leaderInstanceId)
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.FENCED_INSTANCE_ID))
+  }
+
+  @Test
+  def testLeaveGroupStaticMemberWithUnknownMemberId() {
+    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    // Having unknown member id will not affect the request processing.
+    val leaveGroupResults = singleLeaveGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId)
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE))
+  }
+
+  @Test
+  def testStaticMembersValidBatchLeaveGroup() {
+    staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+      .setGroupInstanceId(leaderInstanceId.get), new MemberIdentity().setGroupInstanceId(followerInstanceId.get)))
+
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE, Errors.NONE))
+  }
+
+  @Test
+  def testStaticMembersWrongCoordinatorBatchLeaveGroup() {
+    staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResults = batchLeaveGroup("invalid-group", List(new MemberIdentity()
+      .setGroupInstanceId(leaderInstanceId.get), new MemberIdentity().setGroupInstanceId(followerInstanceId.get)))
+
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NOT_COORDINATOR)
+  }
+
+  @Test
+  def testStaticMembersUnknownGroupBatchLeaveGroup() {
+    val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+      .setGroupInstanceId(leaderInstanceId.get), new MemberIdentity().setGroupInstanceId(followerInstanceId.get)))
+
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
+  }
+
+  @Test
+  def testStaticMembersFencedInstanceBatchLeaveGroup() {
+    staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+      .setGroupInstanceId(leaderInstanceId.get), new MemberIdentity()
+      .setGroupInstanceId(followerInstanceId.get)
+      .setMemberId("invalid-member")))
+
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE, Errors.FENCED_INSTANCE_ID))
+  }
+
+  @Test
+  def testStaticMembersUnknownInstanceBatchLeaveGroup() {
+    staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+      .setGroupInstanceId("unknown-instance"), new MemberIdentity()
+      .setGroupInstanceId(followerInstanceId.get)))
+
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID, Errors.NONE))
+  }
+
+  @Test
+  def testPendingMemberBatchLeaveGroup() {
+    val pendingMember = setupGroupWithPendingMember()
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+      .setGroupInstanceId("unknown-instance"), new MemberIdentity()
+      .setMemberId(pendingMember.memberId)))
+
+    verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID, Errors.NONE))
+  }
+
+  @Test
+  def testPendingMemberWithUnexpectedInstanceIdBatchLeaveGroup() {
+    val pendingMember = setupGroupWithPendingMember()
+
+    EasyMock.reset(replicaManager)
+
+    // Bypass the FENCED_INSTANCE_ID check by defining pending member as a static member.
+    val instanceId = "instanceId"
+    val pendingMemberId = pendingMember.memberId
+    getGroup(groupId).addStaticMember(Option(instanceId), pendingMemberId)
+    val expectedException = intercept[IllegalStateException] {
+      batchLeaveGroup(groupId, List(new MemberIdentity().setGroupInstanceId("unknown-instance"),
+        new MemberIdentity().setGroupInstanceId(instanceId).setMemberId(pendingMemberId)))
+    }
+
+    val message = expectedException.getMessage
+    assertTrue(message.contains(instanceId))
+    assertTrue(message.contains(pendingMemberId))
   }
 
   @Test
@@ -2622,8 +2758,8 @@ class GroupCoordinatorTest {
     val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
 
     EasyMock.reset(replicaManager)
-    val leaveGroupResult = leaveGroup(groupId, joinGroupResult.memberId)
-    assertEquals(Errors.NONE, leaveGroupResult)
+    val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
+    verifyLeaveGroupResult(leaveGroupResults)
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -2663,8 +2799,8 @@ class GroupCoordinatorTest {
     assertEquals(assignedMemberId, describeGroupResult._2.members.head.memberId)
 
     EasyMock.reset(replicaManager)
-    val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
-    assertEquals(Errors.NONE, leaveGroupResult)
+    val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
+    verifyLeaveGroupResult(leaveGroupResults)
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -2785,6 +2921,13 @@ class GroupCoordinatorTest {
     (responseFuture, responseCallback)
   }
 
+  private def setupLeaveGroupCallback: (Future[LeaveGroupResult], LeaveGroupCallback) = {
+    val responsePromise = Promise[LeaveGroupResult]
+    val responseFuture = responsePromise.future
+    val responseCallback: LeaveGroupCallback = result => responsePromise.success(result)
+    (responseFuture, responseCallback)
+  }
+
   private def sendJoinGroup(groupId: String,
                             memberId: String,
                             protocolType: String,
@@ -2980,15 +3123,26 @@ class GroupCoordinatorTest {
     result
   }
 
-  private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
-    val (responseFuture, responseCallback) = setupHeartbeatCallback
+  private def singleLeaveGroup(groupId: String,
+                               consumerId: String,
+                               groupInstanceId: Option[String] = None): LeaveGroupResult = {
+    val singleMemberIdentity = List(
+      new MemberIdentity()
+        .setMemberId(consumerId)
+        .setGroupInstanceId(groupInstanceId.orNull))
+    batchLeaveGroup(groupId, singleMemberIdentity)
+  }
+
+  private def batchLeaveGroup(groupId: String,
+                              memberIdentities: List[MemberIdentity]): LeaveGroupResult = {
+    val (responseFuture, responseCallback) = setupLeaveGroupCallback
 
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
       .andReturn(HostedPartition.None)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+    groupCoordinator.handleLeaveGroup(groupId, memberIdentities, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
@@ -3003,3 +3157,17 @@ class GroupCoordinatorTest {
     OffsetAndMetadata(offset, "", timer.time.milliseconds())
   }
 }
+
+object GroupCoordinatorTest {
+  def verifyLeaveGroupResult(leaveGroupResult: LeaveGroupResult,
+                             expectedTopLevelError: Errors = Errors.NONE,
+                             expectedMemberLevelErrors: List[Errors] = List.empty) {
+    assertEquals(expectedTopLevelError, leaveGroupResult.topLevelError)
+    if (expectedMemberLevelErrors.nonEmpty) {
+      assertEquals(expectedMemberLevelErrors.size, leaveGroupResult.memberResponses.size)
+      for (i <- expectedMemberLevelErrors.indices) {
+            assertEquals(expectedMemberLevelErrors(i), leaveGroupResult.memberResponses(i).error)
+          }
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c26c3fd..44f0301 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -50,6 +50,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
 import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.replica.ClientMetadata
 import org.junit.Assert.{assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
@@ -643,6 +644,63 @@ class KafkaApisTest {
     EasyMock.replay(groupCoordinator)
   }
 
+  @Test
+  def testMultipleLeaveGroup() {
+    val groupId = "groupId"
+
+    val leaveMemberList = List(
+      new MemberIdentity()
+        .setMemberId("member-1")
+        .setGroupInstanceId("instance-1"),
+      new MemberIdentity()
+        .setMemberId("member-2")
+        .setGroupInstanceId("instance-2")
+    )
+
+    EasyMock.expect(groupCoordinator.handleLeaveGroup(
+      EasyMock.eq(groupId),
+      EasyMock.eq(leaveMemberList),
+      anyObject()
+    ))
+
+    val (_, leaveRequest) = buildRequest(
+      new LeaveGroupRequest.Builder(
+        groupId,
+        leaveMemberList.asJava)
+    )
+
+    createKafkaApis().handleLeaveGroupRequest(leaveRequest)
+
+    EasyMock.replay(groupCoordinator)
+  }
+
+  @Test
+  def testSingleLeaveGroup() {
+    val groupId = "groupId"
+    val memberId = "member"
+
+    val singleLeaveMember = List(
+      new MemberIdentity()
+        .setMemberId(memberId)
+    )
+
+    EasyMock.expect(groupCoordinator.handleLeaveGroup(
+      EasyMock.eq(groupId),
+      EasyMock.eq(singleLeaveMember),
+      anyObject()
+    ))
+
+    val (_, leaveRequest) = buildRequest(
+      new LeaveGroupRequest.Builder(
+        groupId,
+        singleLeaveMember.asJava)
+    )
+
+    createKafkaApis().handleLeaveGroupRequest(leaveRequest)
+
+    EasyMock.replay(groupCoordinator)
+  }
+
   /**
    * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
    */
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7fc3de9..242ab21 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
 import org.apache.kafka.common.message.InitProducerIdRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
-import org.apache.kafka.common.message.LeaveGroupRequestData
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.ListGroupsRequestData
 import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.apache.kafka.common.message.SaslAuthenticateRequestData
@@ -325,9 +325,10 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.LEAVE_GROUP =>
           new LeaveGroupRequest.Builder(
-            new LeaveGroupRequestData()
-              .setGroupId("test-leave-group")
-              .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+            "test-leave-group",
+            Collections.singletonList(
+              new MemberIdentity()
+                .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
           )
 
         case ApiKeys.SYNC_GROUP =>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a9baa3f..e1bfe37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -278,7 +278,6 @@ abstract class AssignedTasks<T extends Task> {
     int commit() {
         int committed = 0;
         RuntimeException firstException = null;
-
         for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
             final T task = it.next();
             try {


Mime
View raw message