kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (#6177)
Date Fri, 26 Apr 2019 18:44:52 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f995ba  KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (#6177)
0f995ba is described below

commit 0f995ba6be1c0f949320b0879241d9a7c9578436
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Fri Apr 26 11:44:38 2019 -0700

    KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (#6177)
    
    This is the first diff for the implementation of JoinGroup logic for static membership. The goal of this diff contains:
    
    * Add group.instance.id to be unique identifier for consumer instances, provided by end user;
    Modify group coordinator to accept JoinGroupRequest with/without static membership, refactor the logic for readability and code reusability.
    * Add client side support for incorporating static membership changes, including new config for group.instance.id, apply stream thread client id by default, and new join group exception handling.
    * Increase max session timeout to 30 min for more user flexibility if they are inclined to tolerate partial unavailability than burdening rebalance.
    * Unit tests for each module changes, especially on the group coordinator logic. Crossing the possibilities like:
    6.1 Dynamic/Static member
    6.2 Known/Unknown member id
    6.3 Group stable/unstable
    6.4 Leader/Follower
    
    The rest of the 345 change will be broken down to 4 separate diffs:
    
    * Avoid kicking out members through rebalance.timeout, only do the kick out through session timeout.
    * Changes around LeaveGroup logic, including version bumping, broker logic, client logic, etc.
    * Admin client changes to add ability to batch remove static members
    * Deprecate group.initial.rebalance.delay
    
    Reviewers: Liquan Pei <liquanpei@gmail.com>, Stanislav Kozlovski <familyguyuser192@windowslive.com>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 checkstyle/suppressions.xml                        |   6 +-
 .../kafka/clients/consumer/ConsumerConfig.java     |  21 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |  12 +
 .../consumer/internals/AbstractCoordinator.java    |  23 +-
 .../consumer/internals/ConsumerCoordinator.java    |   2 +
 .../common/errors/FencedInstanceIdException.java   |  29 ++
 .../org/apache/kafka/common/protocol/Errors.java   |   5 +-
 .../kafka/common/requests/JoinGroupRequest.java    |  36 ++
 .../resources/common/message/JoinGroupRequest.json |   5 +-
 .../common/message/JoinGroupResponse.json          |   5 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  63 +--
 .../internals/AbstractCoordinatorTest.java         | 102 +++-
 .../internals/ConsumerCoordinatorTest.java         |  63 ++-
 .../common/requests/JoinGroupRequestTest.java      |  65 +++
 .../runtime/distributed/WorkerCoordinator.java     |   4 +-
 core/src/main/scala/kafka/api/ApiVersion.scala     |  11 +-
 .../kafka/coordinator/group/GroupCoordinator.scala | 200 ++++---
 .../kafka/coordinator/group/GroupMetadata.scala    |  64 ++-
 .../coordinator/group/GroupMetadataManager.scala   |  42 +-
 .../kafka/coordinator/group/MemberMetadata.scala   |  11 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  22 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   1 +
 .../group/GroupCoordinatorConcurrencyTest.scala    |   2 +-
 .../coordinator/group/GroupCoordinatorTest.scala   | 573 ++++++++++++++++++---
 .../group/GroupMetadataManagerTest.scala           |  56 +-
 .../coordinator/group/GroupMetadataTest.scala      |  17 +-
 .../coordinator/group/MemberMetadataTest.scala     |  19 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   1 +
 .../streams/processor/internals/StreamThread.java  |   1 +
 .../FineGrainedAutoResetIntegrationTest.java       |   8 +-
 tests/kafkatest/services/verifiable_consumer.py    |  21 +-
 tests/kafkatest/tests/client/consumer_test.py      |  76 ++-
 tests/kafkatest/tests/verifiable_consumer_test.py  |   4 +-
 .../org/apache/kafka/tools/VerifiableConsumer.java |  13 +
 35 files changed, 1300 insertions(+), 285 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ca103a2..0c65edc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -23,7 +23,7 @@
               files="AbstractResponse.java"/>
 
     <suppress checks="MethodLength"
-              files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java"/>
+              files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>
 
     <suppress checks="ParameterNumber"
               files="NetworkClient.java"/>
@@ -48,10 +48,10 @@
               files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
 
     <suppress checks="BooleanExpressionComplexity"
-              files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
+              files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator|AbstractCoordinator).java"/>
 
     <suppress checks="JavaNCSS"
               files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b92cbf9..bd1c984 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -53,6 +53,16 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String GROUP_ID_CONFIG = "group.id";
     private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
 
+    /**
+     * <code>group.instance.id</code>
+     */
+    public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
+    private static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by end user. " +
+            "Only non-empty strings are permitted. If set, the consumer is treated as a static member, " +
+            "which means that only one instance with this ID is allowed in the consumer group at any time. " +
+            "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability " +
+            "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.";
+
     /** <code>max.poll.records</code> */
     public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
     private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
@@ -278,6 +288,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 .define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC)
+                                .define(GROUP_INSTANCE_ID_CONFIG,
+                                        Type.STRING,
+                                        null,
+                                        Importance.MEDIUM,
+                                        GROUP_INSTANCE_ID_DOC)
                                 .define(SESSION_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         10000,
@@ -455,9 +470,9 @@ public class ConsumerConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         EXCLUDE_INTERNAL_TOPICS_DOC)
                                 .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
-                                                Type.BOOLEAN,
-                                                true,
-                                                Importance.LOW)
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW)
                                 .define(ISOLATION_LEVEL_CONFIG,
                                         Type.STRING,
                                         DEFAULT_ISOLATION_LEVEL,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 839057c..c73a028 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -567,6 +568,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Logger log;
     private final String clientId;
     private String groupId;
+    private Optional<String> groupInstanceId;
     private final ConsumerCoordinator coordinator;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
@@ -671,6 +673,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
             this.clientId = clientId;
             this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
+
+            String groupInstanceId = config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+            if (groupInstanceId != null) {
+                JoinGroupRequest.validateGroupInstanceId(groupInstanceId);
+                this.groupInstanceId = Optional.of(groupInstanceId);
+            } else {
+                this.groupInstanceId = Optional.empty();
+            }
+
             LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
             this.log = logContext.logger(getClass());
             boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
@@ -764,6 +775,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 new ConsumerCoordinator(logContext,
                         this.client,
                         groupId,
+                        this.groupInstanceId,
                         maxPollIntervalMs,
                         sessionTimeoutMs,
                         new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9261966..0124338 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -64,6 +64,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -112,10 +113,12 @@ public abstract class AbstractCoordinator implements Closeable {
     private final Heartbeat heartbeat;
     protected final int rebalanceTimeoutMs;
     protected final String groupId;
+    protected final Optional<String> groupInstanceId;
     protected final ConsumerNetworkClient client;
     protected final Time time;
     protected final long retryBackoffMs;
 
+
     private HeartbeatThread heartbeatThread = null;
     private boolean rejoinNeeded = true;
     private boolean needsJoinPrepare = true;
@@ -132,6 +135,7 @@ public abstract class AbstractCoordinator implements Closeable {
     public AbstractCoordinator(LogContext logContext,
                                ConsumerNetworkClient client,
                                String groupId,
+                               Optional<String> groupInstanceId,
                                int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
                                Heartbeat heartbeat,
@@ -145,6 +149,7 @@ public abstract class AbstractCoordinator implements Closeable {
         this.time = time;
         this.groupId = Objects.requireNonNull(groupId,
                 "Expected a non-null group id for coordinator construction");
+        this.groupInstanceId = groupInstanceId;
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.leaveGroupOnClose = leaveGroupOnClose;
@@ -156,6 +161,7 @@ public abstract class AbstractCoordinator implements Closeable {
     public AbstractCoordinator(LogContext logContext,
                                ConsumerNetworkClient client,
                                String groupId,
+                               Optional<String> groupInstanceId,
                                int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
                                int heartbeatIntervalMs,
@@ -164,7 +170,7 @@ public abstract class AbstractCoordinator implements Closeable {
                                Time time,
                                long retryBackoffMs,
                                boolean leaveGroupOnClose) {
-        this(logContext, client, groupId, rebalanceTimeoutMs, sessionTimeoutMs,
+        this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
                 new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
                 metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
     }
@@ -495,6 +501,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         .setGroupId(groupId)
                         .setSessionTimeoutMs(this.sessionTimeoutMs)
                         .setMemberId(this.generation.memberId)
+                        .setGroupInstanceId(this.groupInstanceId.orElse(null))
                         .setProtocolType(protocolType())
                         .setProtocols(metadata())
                         .setRebalanceTimeoutMs(this.rebalanceTimeoutMs)
@@ -552,7 +559,9 @@ public abstract class AbstractCoordinator implements Closeable {
                     || error == Errors.INVALID_SESSION_TIMEOUT
                     || error == Errors.INVALID_GROUP_ID
                     || error == Errors.GROUP_AUTHORIZATION_FAILED
-                    || error == Errors.GROUP_MAX_SIZE_REACHED) {
+                    || error == Errors.GROUP_MAX_SIZE_REACHED
+                    || error == Errors.FENCED_INSTANCE_ID) {
+                // log the error and re-throw the exception
                 log.error("Attempt to join group failed due to fatal error: {}", error.message());
                 if (error == Errors.GROUP_MAX_SIZE_REACHED) {
                     future.raise(new GroupMaxSizeReachedException(groupId));
@@ -820,7 +829,11 @@ public abstract class AbstractCoordinator implements Closeable {
      * Leave the current group and reset local generation/memberId.
      */
     public synchronized void maybeLeaveGroup() {
-        if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation.hasMemberId()) {
+        // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
+        // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
+        // and the membership expiration is only controlled by session timeout.
+        if (isDynamicMember() && !coordinatorUnknown() &&
+                state != MemberState.UNJOINED && generation.hasMemberId()) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
             log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator);
@@ -834,6 +847,10 @@ public abstract class AbstractCoordinator implements Closeable {
         resetGeneration();
     }
 
+    protected boolean isDynamicMember() {
+        return !groupInstanceId.isPresent();
+    }
+
     private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
         @Override
         public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 4d40070..deed257 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -121,6 +121,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     public ConsumerCoordinator(LogContext logContext,
                                ConsumerNetworkClient client,
                                String groupId,
+                               Optional<String> groupInstanceId,
                                int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
                                Heartbeat heartbeat,
@@ -138,6 +139,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         super(logContext,
               client,
               groupId,
+              groupInstanceId,
               rebalanceTimeoutMs,
               sessionTimeoutMs,
               heartbeat,
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/FencedInstanceIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/FencedInstanceIdException.java
new file mode 100644
index 0000000..78e4034
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/FencedInstanceIdException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class FencedInstanceIdException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public FencedInstanceIdException(String message) {
+        super(message);
+    }
+
+    public FencedInstanceIdException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index cc493c2..bf168bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -60,6 +60,7 @@ import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
 import org.apache.kafka.common.errors.KafkaStorageException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.MemberIdRequiredException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
@@ -302,7 +303,9 @@ public enum Errors {
             MemberIdRequiredException::new),
     PREFERRED_LEADER_NOT_AVAILABLE(80, "The preferred leader was not available",
             PreferredLeaderNotAvailableException::new),
-    GROUP_MAX_SIZE_REACHED(81, "The consumer group has reached its max size.", GroupMaxSizeReachedException::new);
+    GROUP_MAX_SIZE_REACHED(81, "The consumer group has reached its max size.", GroupMaxSizeReachedException::new),
+    FENCED_INSTANCE_ID(82, "The coordinator reports a more recent member.id associated with the consumer's group.instance.id.",
+            FencedInstanceIdException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 2f46172..f9244cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -52,6 +53,40 @@ public class JoinGroupRequest extends AbstractRequest {
 
     public static final String UNKNOWN_MEMBER_ID = "";
 
+    private static final int MAX_GROUP_INSTANCE_ID_LENGTH = 249;
+
+    /**
+     * Ported from class Topic in {@link org.apache.kafka.common.internals} to restrict the charset for
+     * static member id.
+     */
+    public static void validateGroupInstanceId(String id) {
+        if (id.equals(""))
+            throw new InvalidConfigurationException("Group instance id must be non-empty string");
+        if (id.equals(".") || id.equals(".."))
+            throw new InvalidConfigurationException("Group instance id cannot be \".\" or \"..\"");
+        if (id.length() > MAX_GROUP_INSTANCE_ID_LENGTH)
+            throw new InvalidConfigurationException("Group instance id can't be longer than " + MAX_GROUP_INSTANCE_ID_LENGTH +
+                    " characters: " + id);
+        if (!containsValidPattern(id))
+            throw new InvalidConfigurationException("Group instance id \"" + id + "\" is illegal, it contains a character other than " +
+                    "ASCII alphanumerics, '.', '_' and '-'");
+    }
+
+    /**
+     * Valid characters for Consumer group.instance.id are the ASCII alphanumerics, '.', '_', and '-'
+     */
+    static boolean containsValidPattern(String topic) {
+        for (int i = 0; i < topic.length(); ++i) {
+            char c = topic.charAt(i);
+
+            boolean validChar = (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || c == '.' ||
+                    c == '_' || c == '-';
+            if (!validChar)
+                return false;
+        }
+        return true;
+    }
+
     public JoinGroupRequest(JoinGroupRequestData data, short version) {
         super(ApiKeys.JOIN_GROUP, version);
         this.data = data;
@@ -86,6 +121,7 @@ public class JoinGroupRequest extends AbstractRequest {
             case 2:
             case 3:
             case 4:
+            case 5:
                 return new JoinGroupResponse(
                         new JoinGroupResponseData()
                                 .setThrottleTimeMs(throttleTimeMs)
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 2a3000c..ec98d4a 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -20,8 +20,9 @@
   // Version 1 adds RebalanceTimeoutMs.
   // Version 2 and 3 are the same as version 1.
   // Starting from version 4, the client needs to issue a second request to join group
+  // Starting from version 5, we add a new field called groupInstanceId to indicate member identity across restarts.
   // with assigned id.
-  "validVersions": "0-4",
+  "validVersions": "0-5",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+",
       "about": "The group identifier." },
@@ -33,6 +34,8 @@
       "about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member id assigned by the group coordinator." },
+    { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+      "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "ProtocolType", "type": "string", "versions": "0+",
       "about": "The unique name the for class of protocols implemented by the group we want to join." },
     { "name": "Protocols", "type": "[]JoinGroupRequestProtocol", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json b/clients/src/main/resources/common/message/JoinGroupResponse.json
index a1c6a70..6ffa1bc 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -22,7 +22,8 @@
   // Starting in version 3, on quota violation, brokers send out responses before throttling.
   // Starting in version 4, the client needs to issue a second request to join group
   // with assigned id.
-  "validVersions": "0-4",
+  // Version 5 is bumped to apply group.instance.id to identify member across restarts.
+  "validVersions": "0-5",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@@ -39,6 +40,8 @@
     { "name": "Members", "type": "[]JoinGroupResponseMember", "versions": "0+", "fields": [
       { "name": "MemberId", "type": "string", "versions": "0+",
         "about": "The group member ID." },
+      { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+        "about": "The unique identifier of the consumer instance provided by end user." },
       { "name": "Metadata", "type": "bytes", "versions": "0+",
         "about": "The group member metadata." }
     ]}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 25c72c6..8f23d41 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -145,6 +145,7 @@ public class KafkaConsumerTest {
     private final int autoCommitIntervalMs = 500;
 
     private final String groupId = "mock-group";
+    private final Optional<String> groupInstanceId = Optional.of("mock-instance");
 
     @Test
     public void testMetricsReporterAutoGeneratedClientId() {
@@ -384,7 +385,8 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -417,7 +419,7 @@ public class KafkaConsumerTest {
         Node node = metadata.fetch().nodes().get(0);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -452,7 +454,7 @@ public class KafkaConsumerTest {
 
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -476,7 +478,7 @@ public class KafkaConsumerTest {
 
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -499,7 +501,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singleton(tp0));
         consumer.seekToBeginning(singleton(tp0));
 
@@ -575,7 +577,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
-                true, groupId);
+                true, groupId, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -599,7 +601,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
-                true, groupId);
+                true, groupId, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -624,7 +626,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
-                true, groupId);
+                true, groupId, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -651,7 +653,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
-                true, groupId);
+                true, groupId, Optional.empty());
         consumer.assign(singletonList(tp0));
         consumer.seek(tp0, 20L);
         consumer.poll(Duration.ZERO);
@@ -673,7 +675,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
         // lookup coordinator
@@ -711,7 +713,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -751,7 +753,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
 
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -783,7 +785,7 @@ public class KafkaConsumerTest {
         initMetadata(client, partitionCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -815,7 +817,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -865,7 +867,7 @@ public class KafkaConsumerTest {
 
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -895,7 +897,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RangeAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
 
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -935,7 +937,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RangeAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // initial subscription
         consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
@@ -1049,7 +1051,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RangeAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         // initial subscription
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -1111,7 +1113,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RangeAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // lookup coordinator
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -1167,7 +1169,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RangeAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         // lookup coordinator
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -1221,7 +1223,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RangeAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // lookup coordinator
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -1416,7 +1418,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
@@ -1497,7 +1499,7 @@ public class KafkaConsumerTest {
 
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty());
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -1635,7 +1637,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RangeAssignor();
 
         client.createPendingAuthenticationError(node, 0);
-        return newConsumer(time, client, subscription, metadata, assignor, false);
+        return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
     }
 
     private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
@@ -1829,15 +1831,16 @@ public class KafkaConsumerTest {
                                                       SubscriptionState subscription,
                                                       ConsumerMetadata metadata,
                                                       PartitionAssignor assignor,
-                                                      boolean autoCommitEnabled) {
-        return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId);
+                                                      boolean autoCommitEnabled,
+                                                      Optional<String> groupInstanceId) {
+        return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId);
     }
 
     private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
                                                                   KafkaClient client,
                                                                   SubscriptionState subscription,
                                                                   ConsumerMetadata metadata) {
-        return newConsumer(time, client, subscription, metadata, new RangeAssignor(), false, groupId);
+        return newConsumer(time, client, subscription, metadata, new RangeAssignor(), false, groupId, groupInstanceId);
     }
 
     private KafkaConsumer<String, String> newConsumer(Time time,
@@ -1846,7 +1849,8 @@ public class KafkaConsumerTest {
                                                       ConsumerMetadata metadata,
                                                       PartitionAssignor assignor,
                                                       boolean autoCommitEnabled,
-                                                      String groupId) {
+                                                      String groupId,
+                                                      Optional<String> groupInstanceId) {
         String clientId = "mock-consumer";
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
@@ -1878,6 +1882,7 @@ public class KafkaConsumerTest {
                 loggerFactory,
                 consumerClient,
                 groupId,
+                groupInstanceId,
                 rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeat,
@@ -1975,7 +1980,7 @@ public class KafkaConsumerTest {
                 topicMetadata);
         client.prepareMetadataUpdate(updateResponse);
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
 
         consumer.poll(Duration.ZERO);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index dc8ea46..c3f0ff5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.LogContext;
@@ -47,6 +48,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -81,14 +83,16 @@ public class AbstractCoordinatorTest {
     private DummyCoordinator coordinator;
 
     private void setupCoordinator() {
-        setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS);
+        setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS,
+            Optional.empty());
     }
 
     private void setupCoordinator(int retryBackoffMs) {
-        setupCoordinator(retryBackoffMs, REBALANCE_TIMEOUT_MS);
+        setupCoordinator(retryBackoffMs, REBALANCE_TIMEOUT_MS,
+            Optional.empty());
     }
 
-    private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs) {
+    private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId) {
         LogContext logContext = new LogContext();
         this.mockTime = new MockTime();
         ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, 60 * 60 * 1000L,
@@ -103,7 +107,7 @@ public class AbstractCoordinatorTest {
         mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap()));
         this.node = metadata.fetch().nodes().get(0);
         this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
-        this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs);
+        this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs, groupInstanceId);
     }
 
     @Test
@@ -171,7 +175,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testJoinGroupRequestTimeout() {
-        setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS);
+        setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS,
+            Optional.empty());
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
@@ -188,7 +193,8 @@ public class AbstractCoordinatorTest {
     public void testJoinGroupRequestMaxTimeout() {
         // Ensure we can handle the maximum allowed rebalance timeout
 
-        setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE);
+        setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE,
+            Optional.empty());
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
@@ -235,6 +241,83 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testJoinGroupRequestWithMemberIdMisMatch() {
+        setupCoordinator();
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+        final String memberId = "memberId";
+        final int generation = -1;
+
+        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.FENCED_INSTANCE_ID));
+
+        RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
+        assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
+        assertEquals(Errors.FENCED_INSTANCE_ID.message(), future.exception().getMessage());
+        // Make sure the exception is fatal.
+        assertFalse(future.isRetriable());
+    }
+
+    @Test
+    public void testJoinGroupRequestWithGroupInstanceIdNotFound() {
+        setupCoordinator();
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+        final String memberId = "memberId";
+        final int generation = -1;
+
+        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID));
+
+        RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
+
+        assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.message(), future.exception().getMessage());
+        assertTrue(coordinator.rejoinNeededOrPending());
+        assertTrue(coordinator.hasMatchingGenerationId(generation));
+    }
+
+    @Test
+    public void testLeaveGroupSentWithGroupInstanceIdUnSet() {
+        checkLeaveGroupRequestSent(Optional.empty());
+        checkLeaveGroupRequestSent(Optional.of("groupInstanceId"));
+    }
+
+    private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
+        setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, groupInstanceId);
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+        final RuntimeException e = new RuntimeException();
+
+        // raise the error when the coordinator tries to send leave group request.
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                if (body instanceof LeaveGroupRequest)
+                    throw e;
+                return false;
+            }
+        }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
+
+        try {
+            coordinator.ensureActiveGroup();
+            coordinator.close();
+            if (coordinator.isDynamicMember()) {
+                fail("Expected leavegroup to raise an error.");
+            }
+        } catch (RuntimeException exception) {
+            if (coordinator.isDynamicMember()) {
+                assertEquals(exception, e);
+            } else {
+                fail("Coordinator with group.instance.id set shouldn't send leave group request.");
+            }
+        }
+    }
+
+    @Test
     public void testUncaughtExceptionInHeartbeatThread() throws Exception {
         setupCoordinator();
 
@@ -723,9 +806,10 @@ public class AbstractCoordinatorTest {
                                 Metrics metrics,
                                 Time time,
                                 int rebalanceTimeoutMs,
-                                int retryBackoffMs) {
-            super(new LogContext(), client, GROUP_ID, rebalanceTimeoutMs, SESSION_TIMEOUT_MS,
-                    HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, false);
+                                int retryBackoffMs,
+                                Optional<String> groupInstanceId) {
+            super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
+                    SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent());
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 645e6ed..3f9d89f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -105,6 +105,7 @@ public class ConsumerCoordinatorTest {
     private final TopicPartition t1p = new TopicPartition(topic1, 0);
     private final TopicPartition t2p = new TopicPartition(topic2, 0);
     private final String groupId = "test-group";
+    private final Optional<String> groupInstanceId = Optional.of("test-instance");
     private final int rebalanceTimeoutMs = 60000;
     private final int sessionTimeoutMs = 10000;
     private final int heartbeatIntervalMs = 5000;
@@ -148,7 +149,7 @@ public class ConsumerCoordinatorTest {
         this.mockOffsetCommitCallback = new MockCommitCallback();
         this.partitionAssignor.clear();
 
-        this.coordinator = buildCoordinator(metrics, assignors, false, true);
+        this.coordinator = buildCoordinator(metrics, assignors, false, Optional.empty());
     }
 
     @After
@@ -1065,7 +1066,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testWakeupFromAssignmentCallback() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                false, true);
+                false, Optional.empty());
 
         final String topic = "topic1";
         TopicPartition partition = new TopicPartition(topic, 0);
@@ -1182,7 +1183,7 @@ public class ConsumerCoordinatorTest {
         metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
                 subscriptions, new LogContext(), new ClusterResourceListeners());
         client = new MockClient(time, metadata);
-        coordinator = buildCoordinator(new Metrics(), assignors, false, true);
+        coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty());
 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
@@ -1311,7 +1312,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, true);
+                true, groupInstanceId);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
         joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1327,7 +1328,7 @@ public class ConsumerCoordinatorTest {
     public void testAutoCommitRetryBackoff() {
         final String consumerId = "consumer";
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, true);
+                true, groupInstanceId);
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
         joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
@@ -1360,8 +1361,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitAwaitsInterval() {
         final String consumerId = "consumer";
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, true);
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, true, groupInstanceId);
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
         joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
@@ -1400,7 +1400,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, true);
+                true, groupInstanceId);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -1426,7 +1426,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitManualAssignment() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, true);
+                true, groupInstanceId);
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
@@ -1443,7 +1443,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, true);
+                true, groupInstanceId);
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
@@ -1953,25 +1953,19 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseDynamicAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty());
         gracefulCloseTest(coordinator, true);
     }
 
     @Test
     public void testCloseManualAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
-        gracefulCloseTest(coordinator, false);
-    }
-
-    @Test
-    public void shouldNotLeaveGroupWhenLeaveGroupFlagIsFalse() throws Exception {
-        final ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, false);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty());
         gracefulCloseTest(coordinator, false);
     }
 
     @Test
     public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty());
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 1000, 1000);
@@ -1979,14 +1973,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         closeVerifyTimeout(coordinator, 1000, 0, 0);
     }
 
     @Test
     public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 1000, 1000);
@@ -1994,14 +1988,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
         makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         closeVerifyTimeout(coordinator, 1000, 0, 0);
     }
 
     @Test
     public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
         makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 1000, 1000);
@@ -2009,7 +2003,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
         makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
@@ -2017,27 +2011,27 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseNoResponseForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
     }
 
     @Test
     public void testCloseNoResponseForLeaveGroup() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
     }
 
     @Test
     public void testCloseNoWait() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 0, 0, 0);
     }
 
     @Test
     public void testHeartbeatThreadClose() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
         coordinator.ensureActiveGroup();
         time.sleep(heartbeatIntervalMs + 100);
         Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
@@ -2051,7 +2045,8 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitAfterCoordinatorBackToService() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, true);
+                true, groupInstanceId);
+
         subscriptions.assignFromUser(Collections.singleton(t1p));
         subscriptions.seek(t1p, 100L);
 
@@ -2069,10 +2064,10 @@ public class ConsumerCoordinatorTest {
 
     private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
                                                                final boolean autoCommit,
-                                                               final boolean leaveGroup) {
+                                                               final Optional<String> groupInstanceId) {
         final String consumerId = "consumer";
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                autoCommit, leaveGroup);
+                autoCommit, groupInstanceId);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
         if (useGroupManagement) {
@@ -2165,11 +2160,12 @@ public class ConsumerCoordinatorTest {
     private ConsumerCoordinator buildCoordinator(final Metrics metrics,
                                                  final List<PartitionAssignor> assignors,
                                                  final boolean autoCommitEnabled,
-                                                 final boolean leaveGroup) {
+                                                 final Optional<String> groupInstanceId) {
         return new ConsumerCoordinator(
                 new LogContext(),
                 consumerClient,
                 groupId,
+                groupInstanceId,
                 rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeat,
@@ -2183,7 +2179,8 @@ public class ConsumerCoordinatorTest {
                 autoCommitEnabled,
                 autoCommitIntervalMs,
                 null,
-                leaveGroup);
+                true
+        );
     }
 
     private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
new file mode 100644
index 0000000..0271aac
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class JoinGroupRequestTest {
+
+    @Test
+    public void shouldAcceptValidGroupInstanceIds() {
+        String maxLengthString = TestUtils.randomString(249);
+        String[] validGroupInstanceIds = {"valid", "INSTANCE", "gRoUp", "ar6", "VaL1d", "_0-9_.", "...", maxLengthString};
+
+        for (String instanceId : validGroupInstanceIds) {
+            JoinGroupRequest.validateGroupInstanceId(instanceId);
+        }
+    }
+
+    @Test
+    public void shouldThrowOnInvalidGroupInstanceIds() {
+        char[] longString = new char[250];
+        Arrays.fill(longString, 'a');
+        String[] invalidGroupInstanceIds = {"", "foo bar", "..", "foo:bar", "foo=bar", ".", new String(longString)};
+
+        for (String instanceId : invalidGroupInstanceIds) {
+            try {
+                JoinGroupRequest.validateGroupInstanceId(instanceId);
+                fail("No exception was thrown for invalid instance id: " + instanceId);
+            } catch (InvalidConfigurationException e) {
+                // Good
+            }
+        }
+    }
+
+    @Test
+    public void shouldRecognizeInvalidCharactersInGroupInstanceIds() {
+        char[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='};
+
+        for (char c : invalidChars) {
+            String instanceId = "Is " + c + "illegal";
+            assertFalse(JoinGroupRequest.containsValidPattern(instanceId));
+        }
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index fa89a9d..a5882e8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments
@@ -78,6 +79,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         super(logContext,
               client,
               groupId,
+              Optional.empty(),
               rebalanceTimeoutMs,
               sessionTimeoutMs,
               heartbeatIntervalMs,
@@ -85,7 +87,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
               metricGrpPrefix,
               time,
               retryBackoffMs,
-              true);
+                true);
         this.log = logContext.logger(WorkerCoordinator.class);
         this.restUrl = restUrl;
         this.configStorage = configStorage;
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 8f78a96..d8d1edb 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -86,7 +86,9 @@ object ApiVersion {
     // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
     KAFKA_2_2_IV0,
     // New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207)
-    KAFKA_2_2_IV1
+    KAFKA_2_2_IV1,
+    // Introduced static membership.
+    KAFKA_2_3_IV0
   )
 
   // Map keys are the union of the short and full versions
@@ -298,6 +300,13 @@ case object KAFKA_2_2_IV1 extends DefaultApiVersion {
   val id: Int = 21
 }
 
+case object KAFKA_2_3_IV0 extends DefaultApiVersion {
+  val shortVersion: String = "2.3"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V2
+  val id: Int = 22
+}
+
 object ApiVersionValidator extends Validator {
 
   override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index ead05ba..6a3dbbc 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -27,6 +27,7 @@ import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
 import org.apache.kafka.common.requests._
@@ -101,6 +102,7 @@ class GroupCoordinator(val brokerId: Int,
 
   def handleJoinGroup(groupId: String,
                       memberId: String,
+                      groupInstanceId: Option[String],
                       requireKnownMemberId: Boolean,
                       clientId: String,
                       clientHost: String,
@@ -126,22 +128,22 @@ class GroupCoordinator(val brokerId: Int,
           // exist we should reject the request.
           if (isUnknownMember) {
             val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
-            doUnknownJoinGroup(group, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+            doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
           } else {
             responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
           }
-
         case Some(group) =>
           group.inLock {
             if ((groupIsOverCapacity(group)
                   && group.has(memberId) && !group.get(memberId).isAwaitingJoin) // oversized group, need to shed members that haven't joined yet
                 || (isUnknownMember && group.size >= groupConfig.groupMaxSize)) {
               group.remove(memberId)
+              group.removeStaticMember(groupInstanceId)
               responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
             } else if (isUnknownMember) {
-              doUnknownJoinGroup(group, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+              doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
             } else {
-              doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+              doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
             }
 
             // attempt to complete JoinGroup
@@ -149,11 +151,12 @@ class GroupCoordinator(val brokerId: Int,
               joinPurgatory.checkAndComplete(GroupKey(group.groupId))
             }
           }
+        }
       }
     }
-  }
 
   private def doUnknownJoinGroup(group: GroupMetadata,
+                                 groupInstanceId: Option[String],
                                  requireKnownMemberId: Boolean,
                                  clientId: String,
                                  clientHost: String,
@@ -174,15 +177,44 @@ class GroupCoordinator(val brokerId: Int,
       } else {
         val newMemberId = clientId + "-" + group.generateMemberIdSuffix
 
-        if (requireKnownMemberId) {
-          // If member id required, register the member in the pending member list
-          // and send back a response to call for another join group request with allocated member id.
-          group.addPendingMember(newMemberId)
-          addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
-          responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
+        if (group.hasStaticMember(groupInstanceId)) {
+          val oldMemberId = group.getStaticMemberId(groupInstanceId)
+
+          if (group.is(Stable)) {
+            info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while" +
+              s"old member $oldMemberId will be removed. No rebalance will be triggered.")
+
+            val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
+
+            // Heartbeat of old member id will expire without affection since the group no longer contains that member id.
+            // New heartbeat shall be scheduled with new member id.
+            completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+
+            responseCallback(JoinGroupResult(
+              members = if (group.isLeader(newMemberId)) {
+                group.currentMemberMetadata
+              } else {
+                List.empty
+              },
+              memberId = newMemberId,
+              generationId = group.generationId,
+              subProtocol = group.protocolOrNull,
+              leaderId = group.leaderOrNull,
+              error = Errors.NONE))
+          } else {
+            val knownStaticMember = group.get(oldMemberId)
+            updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback)
+          }
+        } else if (requireKnownMemberId) {
+            // If member id required (dynamic membership), register the member in the pending member list
+            // and send back a response to call for another join group request with allocated member id.
+            group.addPendingMember(newMemberId)
+            addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
+            responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
         } else {
-          addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, clientId, clientHost, protocolType,
-            protocols, group, responseCallback)
+          addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
+            clientId, clientHost, protocolType, protocols, group, responseCallback)
+
         }
       }
     }
@@ -190,6 +222,7 @@ class GroupCoordinator(val brokerId: Int,
   
   private def doJoinGroup(group: GroupMetadata,
                           memberId: String,
+                          groupInstanceId: Option[String],
                           clientId: String,
                           clientHost: String,
                           rebalanceTimeoutMs: Int,
@@ -207,65 +240,79 @@ class GroupCoordinator(val brokerId: Int,
       } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
         responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else if (group.isPendingMember(memberId)) {
-        // A rejoining pending member will be accepted.
-        addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, clientId, clientHost, protocolType,
-          protocols, group, responseCallback)
-      } else if (!group.has(memberId)) {
-        // if the member trying to register with a un-recognized id, send the response to let
-        // it reset its member id and retry.
-        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+        // A rejoining pending member will be accepted. Note that pending member will never be a static member.
+        if (groupInstanceId.isDefined) {
+          throw new IllegalStateException(s"the static member $groupInstanceId was unexpectedly to be assigned " +
+            s"into pending member bucket with member id $memberId")
+        } else {
+          addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
+            clientId, clientHost, protocolType, protocols, group, responseCallback)
+        }
       } else {
-        group.currentState match {
-          case PreparingRebalance =>
-            val member = group.get(memberId)
-            updateMemberAndRebalance(group, member, protocols, responseCallback)
+        val isKnownGroupInstanceId = group.hasStaticMember(groupInstanceId)
+        val groupInstanceIdNotFound = groupInstanceId.isDefined && !isKnownGroupInstanceId
+
+        if (isKnownGroupInstanceId && group.getStaticMemberId(groupInstanceId) != memberId) {
+          // given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
+          responseCallback(joinError(memberId, Errors.FENCED_INSTANCE_ID))
+        } else if (!group.has(memberId) || groupInstanceIdNotFound) {
+            // If the dynamic member trying to register with an unrecognized id, or
+            // the static member joins with unknown group instance id, send the response to let
+            // it reset its member id and retry.
+          responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+        } else {
+          val member = group.get(memberId)
 
-          case CompletingRebalance =>
-            val member = group.get(memberId)
-            if (member.matches(protocols)) {
-              // member is joining with the same metadata (which could be because it failed to
-              // receive the initial JoinGroup response), so just return current group information
-              // for the current generation.
-              responseCallback(JoinGroupResult(
-                members = if (group.isLeader(memberId)) {
-                  group.currentMemberMetadata
-                } else {
-                  Map.empty
-                },
-                memberId = memberId,
-                generationId = group.generationId,
-                subProtocol = group.protocolOrNull,
-                leaderId = group.leaderOrNull,
-                error = Errors.NONE))
-            } else {
-              // member has changed metadata, so force a rebalance
+          group.currentState match {
+            case PreparingRebalance =>
               updateMemberAndRebalance(group, member, protocols, responseCallback)
-            }
 
-          case Stable =>
-            val member = group.get(memberId)
-            if (group.isLeader(memberId) || !member.matches(protocols)) {
-              // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
-              // The latter allows the leader to trigger rebalances for changes affecting assignment
-              // which do not affect the member metadata (such as topic metadata changes for the consumer)
-              updateMemberAndRebalance(group, member, protocols, responseCallback)
-            } else {
-              // for followers with no actual change to their metadata, just return group information
-              // for the current generation which will allow them to issue SyncGroup
-              responseCallback(JoinGroupResult(
-                members = Map.empty,
-                memberId = memberId,
-                generationId = group.generationId,
-                subProtocol = group.protocolOrNull,
-                leaderId = group.leaderOrNull,
-                error = Errors.NONE))
-            }
+            case CompletingRebalance =>
+              if (member.matches(protocols)) {
+                // member is joining with the same metadata (which could be because it failed to
+                // receive the initial JoinGroup response), so just return current group information
+                // for the current generation.
+                responseCallback(JoinGroupResult(
+                  members = if (group.isLeader(memberId)) {
+                    group.currentMemberMetadata
+                  } else {
+                    List.empty
+                  },
+                  memberId = memberId,
+                  generationId = group.generationId,
+                  subProtocol = group.protocolOrNull,
+                  leaderId = group.leaderOrNull,
+                  error = Errors.NONE))
+              } else {
+                // member has changed metadata, so force a rebalance
+                updateMemberAndRebalance(group, member, protocols, responseCallback)
+              }
 
-          case Empty | Dead =>
-            // Group reaches unexpected state. Let the joining member reset their generation and rejoin.
-            warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
-              s"unexpected group state ${group.currentState}")
-            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+            case Stable =>
+              val member = group.get(memberId)
+              if (group.isLeader(memberId) || !member.matches(protocols)) {
+                // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
+                // The latter allows the leader to trigger rebalances for changes affecting assignment
+                // which do not affect the member metadata (such as topic metadata changes for the consumer)
+                updateMemberAndRebalance(group, member, protocols, responseCallback)
+              } else {
+                // for followers with no actual change to their metadata, just return group information
+                // for the current generation which will allow them to issue SyncGroup
+                responseCallback(JoinGroupResult(
+                  members = List.empty,
+                  memberId = memberId,
+                  generationId = group.generationId,
+                  subProtocol = group.protocolOrNull,
+                  leaderId = group.leaderOrNull,
+                  error = Errors.NONE))
+              }
+
+            case Empty | Dead =>
+              // Group reaches unexpected state. Let the joining member reset their generation and rejoin.
+              warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
+                s"unexpected group state ${group.currentState}")
+              responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+          }
         }
       }
     }
@@ -715,7 +762,7 @@ class GroupCoordinator(val brokerId: Int,
 
   private def joinError(memberId: String, error: Errors): JoinGroupResult = {
     JoinGroupResult(
-      members = Map.empty,
+      members = List.empty,
       memberId = memberId,
       generationId = GroupCoordinator.NoGeneration,
       subProtocol = GroupCoordinator.NoProtocol,
@@ -761,13 +808,15 @@ class GroupCoordinator(val brokerId: Int,
   private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                     sessionTimeoutMs: Int,
                                     memberId: String,
+                                    groupInstanceId: Option[String],
                                     clientId: String,
                                     clientHost: String,
                                     protocolType: String,
                                     protocols: List[(String, Array[Byte])],
                                     group: GroupMetadata,
                                     callback: JoinCallback) {
-    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
+    val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
+      clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, protocols)
 
     member.isNew = true
@@ -786,8 +835,11 @@ class GroupCoordinator(val brokerId: Int,
     // for new members. If the new member is still there, we expect it to retry.
     completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
 
-    group.removePendingMember(memberId)
-    maybePrepareRebalance(group, s"Adding new member $memberId")
+    if (member.isStaticMember)
+      group.addStaticMember(groupInstanceId, memberId)
+    else
+      group.removePendingMember(memberId)
+    maybePrepareRebalance(group, s"Adding new member $memberId with group instanceid $groupInstanceId")
   }
 
   private def updateMemberAndRebalance(group: GroupMetadata,
@@ -836,6 +888,7 @@ class GroupCoordinator(val brokerId: Int,
     group.maybeInvokeJoinCallback(member, joinError(NoMemberId, Errors.UNKNOWN_MEMBER_ID))
 
     group.remove(member.memberId)
+    group.removeStaticMember(member.groupInstanceId)
 
     group.currentState match {
       case Dead | Empty =>
@@ -870,6 +923,7 @@ class GroupCoordinator(val brokerId: Int,
       group.notYetRejoinedMembers.foreach { failedMember =>
         removeHeartbeatForLeavingMember(group, failedMember)
         group.remove(failedMember.memberId)
+        group.removeStaticMember(failedMember.groupInstanceId)
         // TODO: cut the socket connection to the client
       }
 
@@ -898,7 +952,7 @@ class GroupCoordinator(val brokerId: Int,
               members = if (group.isLeader(member.memberId)) {
                 group.currentMemberMetadata
               } else {
-                Map.empty
+                List.empty
               },
               memberId = member.memberId,
               generationId = group.generationId,
@@ -1023,7 +1077,7 @@ case class GroupConfig(groupMinSessionTimeoutMs: Int,
                        groupMaxSize: Int,
                        groupInitialRebalanceDelayMs: Int)
 
-case class JoinGroupResult(members: Map[String, Array[Byte]],
+case class JoinGroupResult(members: List[JoinGroupResponseMember],
                            memberId: String,
                            generationId: Int,
                            subProtocol: String,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 5cfd420..5a3841b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.common.OffsetAndMetadata
 import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Seq, immutable, mutable}
@@ -128,7 +129,12 @@ private object GroupMetadata {
     group.protocol = Option(protocol)
     group.leaderId = Option(leaderId)
     group.currentStateTimestamp = currentStateTimestamp
-    members.foreach(group.add(_, null))
+    members.foreach(member => {
+      group.add(member, null)
+      if (member.isStaticMember) {
+        group.addStaticMember(member.groupInstanceId, member.memberId)
+      }
+    })
     group
   }
 }
@@ -184,6 +190,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   private var protocol: Option[String] = None
 
   private val members = new mutable.HashMap[String, MemberMetadata]
+  // Static membership mapping [key: group.instance.id, value: member.id]
+  private val staticMembers = new mutable.HashMap[String, String]
   private val pendingMembers = new mutable.HashSet[String]
   private var numMembersAwaitingJoin = 0
   private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
@@ -236,12 +244,56 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
       leaderId = members.keys.headOption
   }
 
+  /**
+    * [For static members only]: Replace the old member id with the new one,
+    * keep everything else unchanged and return the updated member.
+    */
+  def replaceGroupInstance(oldMemberId: String,
+                           newMemberId: String,
+                           groupInstanceId: Option[String]): MemberMetadata = {
+    if(groupInstanceId.isEmpty) {
+      throw new IllegalArgumentException(s"unexpected null group.instance.id in replaceGroupInstance")
+    }
+    val oldMember = members.remove(oldMemberId)
+      .getOrElse(throw new IllegalArgumentException(s"Cannot replace non-existing member id $oldMemberId"))
+
+    oldMember.memberId = newMemberId
+    members.put(newMemberId, oldMember)
+
+    if (isLeader(oldMemberId))
+      leaderId = Some(newMemberId)
+    addStaticMember(groupInstanceId, newMemberId)
+    oldMember
+  }
+
   def isPendingMember(memberId: String): Boolean = pendingMembers.contains(memberId) && !has(memberId)
 
   def addPendingMember(memberId: String) = pendingMembers.add(memberId)
 
   def removePendingMember(memberId: String) = pendingMembers.remove(memberId)
 
+  def hasStaticMember(groupInstanceId: Option[String]) = groupInstanceId.isDefined && staticMembers.contains(groupInstanceId.get)
+
+  def getStaticMemberId(groupInstanceId: Option[String]) = {
+    if(groupInstanceId.isEmpty) {
+      throw new IllegalArgumentException(s"unexpected null group.instance.id in getStaticMemberId")
+    }
+    staticMembers(groupInstanceId.get)
+  }
+
+  def addStaticMember(groupInstanceId: Option[String], newMemberId: String) = {
+    if(groupInstanceId.isEmpty) {
+      throw new IllegalArgumentException(s"unexpected null group.instance.id in addStaticMember")
+    }
+    staticMembers.put(groupInstanceId.get, newMemberId)
+  }
+
+  def removeStaticMember(groupInstanceId: Option[String]) = {
+    if (groupInstanceId.isDefined) {
+      staticMembers.remove(groupInstanceId.get)
+    }
+  }
+
   def currentState = state
 
   def notYetRejoinedMembers = members.values.filter(!_.isAwaitingJoin).toList
@@ -250,6 +302,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
 
   def allMembers = members.keySet
 
+  def allStaticMembers = staticMembers.keySet
+
   def numPending = pendingMembers.size
 
   def allMemberMetadata = members.values.toList
@@ -337,10 +391,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     receivedTransactionalOffsetCommits = false
   }
 
-  def currentMemberMetadata: Map[String, Array[Byte]] = {
+  def currentMemberMetadata: List[JoinGroupResponseMember] = {
     if (is(Dead) || is(PreparingRebalance))
       throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
-    members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol.get))}.toMap
+    members.map{ case (memberId, memberMetadata) => new JoinGroupResponseMember()
+        .setMemberId(memberId)
+        .setGroupInstanceId(memberMetadata.groupInstanceId.orNull)
+        .setMetadata(memberMetadata.metadata(protocol.get))
+    }.toList
   }
 
   def summary: GroupSummary = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index eecf713..fc7f4e8 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
-import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1}
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_2_IV0, KAFKA_2_3_IV0}
 import kafka.common.{MessageFormatter, OffsetAndMetadata}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
@@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 
@@ -977,6 +977,7 @@ object GroupMetadataManager {
   private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
 
   private val MEMBER_ID_KEY = "member_id"
+  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
   private val CLIENT_ID_KEY = "client_id"
   private val CLIENT_HOST_KEY = "client_host"
   private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
@@ -1003,6 +1004,16 @@ object GroupMetadataManager {
 
   private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
 
+  private val MEMBER_METADATA_V3 = new Schema(
+    new Field(MEMBER_ID_KEY, STRING),
+    new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
+    new Field(CLIENT_ID_KEY, STRING),
+    new Field(CLIENT_HOST_KEY, STRING),
+    new Field(REBALANCE_TIMEOUT_KEY, INT32),
+    new Field(SESSION_TIMEOUT_KEY, INT32),
+    new Field(SUBSCRIPTION_KEY, BYTES),
+    new Field(ASSIGNMENT_KEY, BYTES))
+
   private val PROTOCOL_TYPE_KEY = "protocol_type"
   private val GENERATION_KEY = "generation"
   private val PROTOCOL_KEY = "protocol"
@@ -1032,6 +1043,14 @@ object GroupMetadataManager {
     new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
     new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
 
+  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
+    new Field(PROTOCOL_TYPE_KEY, STRING),
+    new Field(GENERATION_KEY, INT32),
+    new Field(PROTOCOL_KEY, NULLABLE_STRING),
+    new Field(LEADER_KEY, NULLABLE_STRING),
+    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
+
   // map of versions to key schemas as data types
   private val MESSAGE_TYPE_SCHEMAS = Map(
     0 -> OFFSET_COMMIT_KEY_SCHEMA,
@@ -1049,7 +1068,8 @@ object GroupMetadataManager {
   private val GROUP_VALUE_SCHEMAS = Map(
     0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
     1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
-    2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
+    2 -> GROUP_METADATA_VALUE_SCHEMA_V2,
+    3 -> GROUP_METADATA_VALUE_SCHEMA_V3)
 
   private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
@@ -1172,8 +1192,10 @@ object GroupMetadataManager {
         (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
       else if (apiVersion < KAFKA_2_1_IV0)
         (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-      else
+      else if (apiVersion < KAFKA_2_3_IV0)
         (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
+      else
+        (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
     }
 
     value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
@@ -1194,6 +1216,9 @@ object GroupMetadataManager {
       if (version > 0)
         memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
 
+      if (version >= 3)
+        memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull)
+
       // The group is non-empty, so the current protocol must be defined
       val protocol = groupMetadata.protocolOrNull
       if (protocol == null)
@@ -1312,7 +1337,7 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 2) {
+      if (version >= 0 && version <= 3) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
@@ -1333,13 +1358,18 @@ object GroupMetadataManager {
         val members = memberMetadataArray.map { memberMetadataObj =>
           val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
           val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
+          val groupInstanceId =
+            if (version >= 3)
+              Some(memberMetadata.get(GROUP_INSTANCE_ID_KEY).asInstanceOf[String])
+            else
+              None
           val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
           val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
           val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
           val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
           val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
 
-          val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+          val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
             protocolType, List((protocol, subscription)))
           member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
           member
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index 1932f42..844d7e6 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -22,8 +22,8 @@ import java.util
 import kafka.utils.nonthreadsafe
 import org.apache.kafka.common.protocol.Errors
 
-
 case class MemberSummary(memberId: String,
+                         groupInstanceId: Option[String],
                          clientId: String,
                          clientHost: String,
                          metadata: Array[Byte],
@@ -54,8 +54,9 @@ private object MemberMetadata {
  *                            and the group transitions to stable
  */
 @nonthreadsafe
-private[group] class MemberMetadata(val memberId: String,
+private[group] class MemberMetadata(var memberId: String,
                                     val groupId: String,
+                                    val groupInstanceId: Option[String],
                                     val clientId: String,
                                     val clientHost: String,
                                     val rebalanceTimeoutMs: Int,
@@ -69,6 +70,7 @@ private[group] class MemberMetadata(val memberId: String,
   var latestHeartbeat: Long = -1
   var isLeaving: Boolean = false
   var isNew: Boolean = false
+  val isStaticMember: Boolean = groupInstanceId.isDefined
 
   def isAwaitingJoin = awaitingJoinCallback != null
 
@@ -107,11 +109,11 @@ private[group] class MemberMetadata(val memberId: String,
   }
 
   def summary(protocol: String): MemberSummary = {
-    MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
+    MemberSummary(memberId, groupInstanceId, clientId, clientHost, metadata(protocol), assignment)
   }
 
   def summaryNoMetadata(): MemberSummary = {
-    MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
+    MemberSummary(memberId, groupInstanceId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
   }
 
   /**
@@ -129,6 +131,7 @@ private[group] class MemberMetadata(val memberId: String,
   override def toString: String = {
     "MemberMetadata(" +
       s"memberId=$memberId, " +
+      s"groupInstanceId=$groupInstanceId, " +
       s"clientId=$clientId, " +
       s"clientHost=$clientHost, " +
       s"sessionTimeoutMs=$sessionTimeoutMs, " +
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index aa368f8..b0482c8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Collections, Optional, Properties}
 
 import kafka.admin.{AdminUtils, RackAwareMode}
-import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
 import kafka.controller.KafkaController
@@ -1298,12 +1298,6 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a join-group response
     def sendResponseCallback(joinResult: JoinGroupResult) {
-      val members = joinResult.members map { case (memberId, metadataArray) =>
-        new JoinGroupResponseData.JoinGroupResponseMember()
-          .setMemberId(memberId)
-          .setMetadata(metadataArray)
-      }
-
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val responseBody = new JoinGroupResponse(
           new JoinGroupResponseData()
@@ -1313,7 +1307,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             .setProtocolName(joinResult.subProtocol)
             .setLeader(joinResult.leaderId)
             .setMemberId(joinResult.memberId)
-            .setMembers(members.toSeq.asJava)
+            .setMembers(joinResult.members.asJava)
         )
 
         trace("Sending join group response %s for correlation id %d to client %s."
@@ -1337,8 +1331,17 @@ class KafkaApis(val requestChannel: RequestChannel,
         )
       )
     } else {
+      val encodedGroupInstanceId = joinGroupRequest.data().groupInstanceId
+      val groupInstanceId =
+        if (encodedGroupInstanceId == null ||
+        config.interBrokerProtocolVersion < KAFKA_2_3_IV0)
+          None
+        else
+          Some(encodedGroupInstanceId)
+
       // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
-      val requireKnownMemberId = joinGroupRequest.version >= 4
+      // and groupInstanceId is configured to unknown.
+      val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty
 
       // let the coordinator handle join-group
       val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
@@ -1346,6 +1349,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       groupCoordinator.handleJoinGroup(
         joinGroupRequest.data().groupId,
         joinGroupRequest.data().memberId,
+        groupInstanceId,
         requireKnownMemberId,
         request.header.clientId,
         request.session.clientAddress.toString,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8f127bc..04f4f3b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -151,7 +151,7 @@ object Defaults {
 
   /** ********* Group coordinator configuration ***********/
   val GroupMinSessionTimeoutMs = 6000
-  val GroupMaxSessionTimeoutMs = 300000
+  val GroupMaxSessionTimeoutMs = 1800000
   val GroupInitialRebalanceDelayMs = 3000
   val GroupMaxSize: Int = Int.MaxValue
 
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 23a0ad0..1e7c8a8 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -327,6 +327,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
         .setGroupId(group)
         .setSessionTimeoutMs(10000)
         .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+        .setGroupInstanceId(null)
         .setProtocolType("consumer")
         .setProtocols(protocolSet)
         .setRebalanceTimeoutMs(60000)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index ba179a5..25f2ba4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -159,7 +159,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
       callback
     }
     override def runWithCallback(member: GroupMember, responseCallback: JoinGroupCallback): Unit = {
-      groupCoordinator.handleJoinGroup(member.groupId, member.memberId, requireKnownMemberId = false, "clientId", "clientHost",
+      groupCoordinator.handleJoinGroup(member.groupId, member.memberId, None, requireKnownMemberId = false, "clientId", "clientHost",
        DefaultRebalanceTimeout, DefaultSessionTimeout,
        protocolType, protocols, responseCallback)
     }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index cf76375..478f027 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -72,8 +72,12 @@ class GroupCoordinatorTest extends JUnitSuite {
   private val groupId = "groupId"
   private val protocolType = "consumer"
   private val memberId = "memberId"
+  private val groupInstanceId = Some("groupInstanceId")
+  private val leaderInstanceId = Some("leader")
+  private val followerInstanceId = Some("follower")
   private val metadata = Array[Byte]()
   private val protocols = List(("range", metadata))
+  private val protocolSuperset = List(("range", metadata), ("roundrobin", metadata))
   private var groupPartitionId: Int = -1
 
   // we use this string value since its hashcode % #.partitions is different
@@ -127,9 +131,14 @@ class GroupCoordinatorTest extends JUnitSuite {
     groupCoordinator.groupManager.addLoadingPartition(otherGroupPartitionId)
     assertTrue(groupCoordinator.groupManager.isGroupLoading(otherGroupId))
 
-    // JoinGroup
+    // Dynamic Member JoinGroup
     var joinGroupResponse: Option[JoinGroupResult] = None
-    groupCoordinator.handleJoinGroup(otherGroupId, memberId, true, "clientId", "clientHost", 60000, 10000, "consumer",
+    groupCoordinator.handleJoinGroup(otherGroupId, memberId, None, true, "clientId", "clientHost", 60000, 10000, "consumer",
+      List("range" -> new Array[Byte](0)), result => { joinGroupResponse = Some(result)})
+    assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), joinGroupResponse.map(_.error))
+
+    // Static Member JoinGroup
+    groupCoordinator.handleJoinGroup(otherGroupId, memberId, Some("groupInstanceId"), false, "clientId", "clientHost", 60000, 10000, "consumer",
       List("range" -> new Array[Byte](0)), result => { joinGroupResponse = Some(result)})
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), joinGroupResponse.map(_.error))
 
@@ -188,9 +197,12 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupWrongCoordinator() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(otherGroupId, memberId, protocolType, protocols)
-    val joinGroupError = joinGroupResult.error
-    assertEquals(Errors.NOT_COORDINATOR, joinGroupError)
+    var joinGroupResult = dynamicJoinGroup(otherGroupId, memberId, protocolType, protocols)
+    assertEquals(Errors.NOT_COORDINATOR, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    joinGroupResult = staticJoinGroup(otherGroupId, memberId, groupInstanceId, protocolType, protocols)
+    assertEquals(Errors.NOT_COORDINATOR, joinGroupResult.error)
   }
 
   @Test
@@ -199,7 +211,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val rebalanceTimeout = GroupInitialRebalanceDelay * 2
 
     for (i <- 1.to(GroupMaxSize)) {
-      futures += sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+      futures += sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
       if (i != 1)
         timer.advanceClock(GroupInitialRebalanceDelay)
       EasyMock.reset(replicaManager)
@@ -211,7 +223,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     }
 
     // Should receive an error since the group is full
-    val errorFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+    val errorFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
     assertEquals(Errors.GROUP_MAX_SIZE_REACHED, await(errorFuture, 1).error)
   }
 
@@ -219,7 +231,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupSessionTimeoutTooSmall() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
   }
@@ -228,16 +240,19 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupSessionTimeoutTooLarge() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
   }
 
   @Test
   def testJoinGroupUnknownConsumerNewGroup() {
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    val joinGroupError = joinGroupResult.error
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupError)
+    var joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    joinGroupResult = staticJoinGroup(groupId, memberId, groupInstanceId, protocolType, protocols)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
   }
 
   @Test
@@ -245,7 +260,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val groupId = ""
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     assertEquals(Errors.INVALID_GROUP_ID, joinGroupResult.error)
   }
 
@@ -253,7 +268,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testValidJoinGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
   }
@@ -263,7 +278,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
@@ -275,7 +290,11 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupWithEmptyProtocolType() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, "", protocols)
+    var joinGroupResult = dynamicJoinGroup(groupId, memberId, "", protocols)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    joinGroupResult = staticJoinGroup(groupId, memberId, groupInstanceId, "", protocols)
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
   }
 
@@ -283,7 +302,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupWithEmptyGroupProtocol() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, List())
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, List())
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
   }
 
@@ -297,7 +316,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
     val rebalanceTimeout = GroupCoordinator.NewMemberJoinTimeoutMs * 2
 
-    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
       sessionTimeout, rebalanceTimeout)
     val firstMemberId = firstJoinResult.memberId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -309,8 +328,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(0, group.allMemberMetadata.count(_.isNew))
 
     EasyMock.reset(replicaManager)
-
-    val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, sessionTimeout, rebalanceTimeout)
+    val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, None, sessionTimeout, rebalanceTimeout)
     assertFalse(responseFuture.isCompleted)
 
     assertEquals(2, group.allMembers.size)
@@ -338,7 +356,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, List(("range", metadata)))
 
     EasyMock.reset(replicaManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata)))
+    val otherJoinGroupResult = dynamicJoinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata)))
     timer.advanceClock(GroupInitialRebalanceDelay + 1)
 
     val joinGroupResult = await(joinGroupFuture, 1)
@@ -351,7 +369,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "memberId"
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
@@ -360,13 +378,13 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
-  def testJoinGroupUnknownConsumerDeadGroup() {
+  def testJoinGroupUnknownConsumerNewDeadGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val deadGroupId = "deadGroupId"
 
     groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
-    val joinGroupResult = joinGroup(deadGroupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(deadGroupId, memberId, protocolType, protocols)
     assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
   }
 
@@ -392,6 +410,406 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def staticMemberJoinAsFirstMember() {
+    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+  }
+
+  @Test
+  def staticMemberReJoinWithExplicitUnknownMemberId() {
+    var joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    val unknownMemberId = "unknown_member"
+    joinGroupResult = staticJoinGroup(groupId, unknownMemberId, groupInstanceId, protocolType, protocols)
+    assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error)
+  }
+
+  @Test
+  def staticMemberRejoinWithKnownMemberId() {
+    var joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    val assignedMemberId = joinGroupResult.memberId
+    // The second join group should return immediately since we are using the same metadata during CompletingRebalance.
+    val rejoinResponseFuture = sendJoinGroup(groupId, assignedMemberId, protocolType, protocols, groupInstanceId)
+    timer.advanceClock(1)
+    joinGroupResult = Await.result(rejoinResponseFuture, Duration(1, TimeUnit.MILLISECONDS))
+    assertEquals(Errors.NONE, joinGroupResult.error)
+    assertTrue(getGroup(groupId).is(CompletingRebalance))
+
+    EasyMock.reset(replicaManager)
+    val syncGroupFuture = sendSyncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    timer.advanceClock(1)
+    val syncGroupResult = Await.result(syncGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
+    assertEquals(Errors.NONE, syncGroupResult._2)
+    assertTrue(getGroup(groupId).is(Stable))
+  }
+
+  @Test
+  def staticMemberRejoinWithLeaderIdAndUnknownMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    // A static leader rejoin with unknown id will not trigger rebalance.
+    EasyMock.reset(replicaManager)
+    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+    checkJoinGroupResult(joinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation, // The group should be at the same generation
+      Set(leaderInstanceId, followerInstanceId),
+      groupId,
+      Stable)
+
+    EasyMock.reset(replicaManager)
+    val oldLeaderJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+    assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderJoinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    assertNotEquals(rebalanceResult.leaderId, joinGroupResult.leaderId)
+    val oldLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, oldLeaderSyncGroupResult._2)
+
+    EasyMock.reset(replicaManager)
+    val newLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, joinGroupResult.leaderId, Map.empty)
+    assertEquals(Errors.NONE, newLeaderSyncGroupResult._2)
+    assertEquals(rebalanceResult.leaderAssignment, newLeaderSyncGroupResult._1)
+  }
+
+  @Test
+  def staticMemberRejoinWithLeaderIdAndKnownMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    // A static leader with known id rejoin will trigger rebalance.
+    EasyMock.reset(replicaManager)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = DefaultSessionTimeout + 1)
+    // Timeout follower in the meantime.
+    assertFalse(getGroup(groupId).hasStaticMember(followerInstanceId))
+    checkJoinGroupResult(joinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation + 1, // The group has promoted to the new generation.
+      Set(leaderInstanceId),
+      groupId,
+      CompletingRebalance,
+      rebalanceResult.leaderId,
+      rebalanceResult.leaderId)
+  }
+
+  @Test
+  def staticMemberRejoinWithLeaderIdAndUnexpectedDeadGroup() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    getGroup(groupId).transitionTo(Dead)
+
+    EasyMock.reset(replicaManager)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocols, clockAdvance = 1)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+  }
+
+  @Test
+  def staticMemberRejoinWithLeaderIdAndUnexpectedEmptyGroup() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    getGroup(groupId).transitionTo(PreparingRebalance)
+    getGroup(groupId).transitionTo(Empty)
+
+    EasyMock.reset(replicaManager)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocols, clockAdvance = 1)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+  }
+
+  @Test
+  def staticMemberRejoinWithFollowerIdAndChangeOfProtocol() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    // A static follower rejoin with changed protocol will trigger rebalance.
+    EasyMock.reset(replicaManager)
+    val newProtocols = List(("roundrobin", metadata))
+    // Timeout old leader in the meantime.
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = DefaultSessionTimeout + 1)
+
+    assertFalse(getGroup(groupId).hasStaticMember(leaderInstanceId))
+    checkJoinGroupResult(joinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation + 1, // The group has promoted to the new generation, and leader has changed because old one times out.
+      Set(followerInstanceId),
+      groupId,
+      CompletingRebalance,
+      rebalanceResult.followerId,
+      rebalanceResult.followerId)
+  }
+
+  @Test
+  def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocol() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    // A static follower rejoin with protocol changing to leader protocol subset won't trigger rebalance.
+    EasyMock.reset(replicaManager)
+    val newProtocols = List(("roundrobin", metadata))
+    // Timeout old leader in the meantime.
+    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
+
+    checkJoinGroupResult(joinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation,
+      Set.empty,
+      groupId,
+      Stable)
+
+    EasyMock.reset(replicaManager)
+    // Join with old member id will fail because the member id is updated
+    assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId)
+    val oldFollowerJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
+    assertEquals(Errors.FENCED_INSTANCE_ID, oldFollowerJoinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    // Sync with old member id will fail because the member id is updated
+    val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, rebalanceResult.followerId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupWithOldMemberIdResult._2)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupWithNewMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId)
+    assertEquals(Errors.NONE, syncGroupWithNewMemberIdResult._2)
+    assertEquals(rebalanceResult.followerAssignment, syncGroupWithNewMemberIdResult._1)
+  }
+
+  @Test
+  def staticMemberRejoinWithKnownLeaderIdToTriggerRebalanceAndFollowerWithChangeofProtocol() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    // A static leader rejoin with known member id will trigger rebalance.
+    EasyMock.reset(replicaManager)
+    val leaderRejoinGroupFuture = sendJoinGroup(groupId, rebalanceResult.leaderId, protocolType, protocolSuperset, leaderInstanceId)
+    // Rebalance complete immediately after follower rejoin.
+    EasyMock.reset(replicaManager)
+    val followerRejoinWithFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocolSuperset, followerInstanceId)
+
+    timer.advanceClock(1)
+
+    // Leader should get the same assignment as last round.
+    checkJoinGroupResult(await(leaderRejoinGroupFuture, 1),
+      Errors.NONE,
+      rebalanceResult.generation + 1, // The group has promoted to the new generation.
+      Set(leaderInstanceId, followerInstanceId),
+      groupId,
+      CompletingRebalance,
+      rebalanceResult.leaderId,
+      rebalanceResult.leaderId)
+
+    checkJoinGroupResult(await(followerRejoinWithFuture, 1),
+      Errors.NONE,
+      rebalanceResult.generation + 1, // The group has promoted to the new generation.
+      Set.empty,
+      groupId,
+      CompletingRebalance,
+      rebalanceResult.leaderId,
+      rebalanceResult.followerId)
+
+    EasyMock.reset(replicaManager)
+    // The follower protocol changed from protocolSuperset to general protocols.
+    val followerRejoinWithProtocolChangeFuture = sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocols, followerInstanceId)
+    // The group will transit to PreparingRebalance due to protocol change from follower.
+    assertTrue(getGroup(groupId).is(PreparingRebalance))
+
+    timer.advanceClock(DefaultRebalanceTimeout + 1)
+    checkJoinGroupResult(await(followerRejoinWithProtocolChangeFuture, 1),
+      Errors.NONE,
+      rebalanceResult.generation + 2, // The group has promoted to the new generation.
+      Set(followerInstanceId),
+      groupId,
+      CompletingRebalance,
+      rebalanceResult.followerId,
+      rebalanceResult.followerId)
+  }
+
+  @Test
+  def staticMemberRejoinAsFollowerWithUnknownMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    // A static follower rejoin with no protocol change will not trigger rebalance.
+    val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+    // Old leader shouldn't be timed out.
+    assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId))
+    checkJoinGroupResult(joinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation, // The group has no change.
+      Set.empty,
+      groupId,
+      Stable)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId)
+    assertEquals(Errors.NONE, syncGroupResult._2)
+    assertEquals(rebalanceResult.followerAssignment, syncGroupResult._1)
+  }
+
+  @Test
+  def staticMemberRejoinAsFollowerWithKnownMemberIdAndNoProtocolChange() {
+    val rebalanceResult  = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    // A static follower rejoin with no protocol change will not trigger rebalance.
+    EasyMock.reset(replicaManager)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+    // Old leader shouldn't be timed out.
+    assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId))
+    checkJoinGroupResult(joinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation, // The group has no change.
+      Set.empty,
+      groupId,
+      Stable,
+      rebalanceResult.leaderId,
+      rebalanceResult.followerId)
+  }
+
+  @Test
+  def staticMemberRejoinAsFollowerWithMismatchedMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+    assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error)
+  }
+
+  @Test
+  def staticMemberRejoinAsLeaderWithMismatchedMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+    assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error)
+  }
+
+  @Test
+  def staticMemberJoinWithUnknownInstanceIdAndKnownMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, Some("unknown_instance"), protocolType, protocolSuperset, clockAdvance = 1)
+
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+  }
+
+  @Test
+  def staticMemberJoinWithIllegalStateAsPendingMember() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+    val group = groupCoordinator.groupManager.getGroup(groupId).get
+    group.addPendingMember(rebalanceResult.followerId)
+    group.remove(rebalanceResult.followerId)
+    EasyMock.reset(replicaManager)
+
+    // Illegal state exception shall trigger since follower id resides in pending member bucket.
+    val expectedException = intercept[IllegalStateException] {
+      staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+    }
+
+    val message = expectedException.getMessage
+    assertTrue(message.contains(rebalanceResult.followerId))
+    assertTrue(message.contains(followerInstanceId.get))
+  }
+
+  @Test
+  def staticMemberReJoinWithIllegalArgumentAsMissingOldMember() {
+    val _ = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+    val group = groupCoordinator.groupManager.getGroup(groupId).get
+    val invalidMemberId = "invalid_member_id"
+    group.addStaticMember(followerInstanceId, invalidMemberId)
+    EasyMock.reset(replicaManager)
+
+    // Illegal state exception shall trigger since follower corresponding id is not defined in member list.
+    val expectedException = intercept[IllegalArgumentException] {
+      staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+    }
+
+    val message = expectedException.getMessage
+    assertTrue(message.contains(invalidMemberId))
+  }
+
+  private class RebalanceResult(val generation: Int,
+                                val leaderId: String,
+                                val leaderAssignment: Array[Byte],
+                                val followerId: String,
+                                val followerAssignment: Array[Byte])
+  /**
+    * Generate static member rebalance results, including:
+    *   - generation
+    *   - leader id
+    *   - leader assignment
+    *   - follower id
+    *   - follower assignment
+    */
+  private def staticMembersJoinAndRebalance(leaderInstanceId: Option[String],
+                                            followerInstanceId: Option[String]): RebalanceResult = {
+    EasyMock.reset(replicaManager)
+    val leaderResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocolSuperset, leaderInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val followerResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocolSuperset, followerInstanceId)
+    // The goal for two timer advance is to let first group initial join complete and set newMemberAdded flag to false. Next advance is
+    // to trigger the rebalance as needed for follower delayed join. One large time advance won't help because we could only populate one
+    // delayed join from purgatory and the new delayed op is created at that time and never be triggered.
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+    timer.advanceClock(DefaultRebalanceTimeout + 1)
+    val newGeneration = 1
+
+    val leaderJoinGroupResult = await(leaderResponseFuture, 1)
+    assertEquals(Errors.NONE, leaderJoinGroupResult.error)
+    assertEquals(newGeneration, leaderJoinGroupResult.generationId)
+
+    val followerJoinGroupResult = await(followerResponseFuture, 1)
+    assertEquals(Errors.NONE, followerJoinGroupResult.error)
+    assertEquals(newGeneration, followerJoinGroupResult.generationId)
+
+    EasyMock.reset(replicaManager)
+    val leaderId = leaderJoinGroupResult.memberId
+    val leaderSyncGroupResult = syncGroupLeader(groupId, leaderJoinGroupResult.generationId, leaderId, Map(leaderId -> Array[Byte]()))
+    assertEquals(Errors.NONE, leaderSyncGroupResult._2)
+    assertTrue(getGroup(groupId).is(Stable))
+
+    EasyMock.reset(replicaManager)
+    val followerId = followerJoinGroupResult.memberId
+    val follwerSyncGroupResult = syncGroupFollower(groupId, leaderJoinGroupResult.generationId, followerId)
+    assertEquals(Errors.NONE, follwerSyncGroupResult._2)
+    assertTrue(getGroup(groupId).is(Stable))
+
+    new RebalanceResult(newGeneration,
+      leaderId,
+      leaderSyncGroupResult._1,
+      followerId,
+      follwerSyncGroupResult._1)
+  }
+
+  private def checkJoinGroupResult(joinGroupResult: JoinGroupResult,
+                                   expectedError: Errors,
+                                   expectedGeneration: Int,
+                                   expectedGroupInstanceIds: Set[Option[String]],
+                                   groupId: String,
+                                   expectedGroupState: GroupState,
+                                   expectedLeaderId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+                                   expectedMemberId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+    assertEquals(Errors.NONE, joinGroupResult.error)
+    assertEquals(expectedGeneration, joinGroupResult.generationId)
+    assertEquals(expectedGroupInstanceIds.size, joinGroupResult.members.size)
+    val resultedGroupInstanceIds = joinGroupResult.members.map(member => Some(member.groupInstanceId())).toSet
+    assertEquals(expectedGroupInstanceIds, resultedGroupInstanceIds)
+    assertTrue(getGroup(groupId).is(expectedGroupState))
+
+    if (!expectedLeaderId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID)) {
+      assertEquals(expectedLeaderId, joinGroupResult.leaderId)
+    }
+    if (!expectedMemberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID)) {
+      assertEquals(expectedMemberId, joinGroupResult.memberId)
+    }
+  }
+
+  @Test
   def testHeartbeatWrongCoordinator() {
     val heartbeatResult = heartbeat(otherGroupId, memberId, -1)
     assertEquals(Errors.NOT_COORDINATOR, heartbeatResult)
@@ -409,7 +827,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "memberId"
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
@@ -428,7 +846,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testHeartbeatRebalanceInProgress() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
@@ -442,7 +860,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testHeartbeatIllegalGeneration() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
@@ -461,7 +879,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testValidHeartbeat() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
@@ -481,7 +899,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testSessionTimeout() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
@@ -508,7 +926,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val sessionTimeout = 1000
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols,
       rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
@@ -539,7 +957,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp = new TopicPartition("topic", 0)
     val offset = offsetAndMetadata(0)
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols,
       rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
@@ -566,7 +984,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testSessionTimeoutDuringRebalance() {
     // create a group with a single member
-    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
       rebalanceTimeout = 2000, sessionTimeout = 1000)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
@@ -602,7 +1020,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testRebalanceCompletesBeforeMemberJoins() {
     // create a group with a single member
-    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
       rebalanceTimeout = 1200, sessionTimeout = 1000)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
@@ -657,7 +1075,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testSyncGroupEmptyAssignment() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
@@ -694,7 +1112,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testSyncGroupFromUnknownMember() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     assertEquals(Errors.NONE, joinGroupResult.error)
@@ -714,7 +1132,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testSyncGroupFromIllegalGeneration() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     assertEquals(Errors.NONE, joinGroupResult.error)
@@ -731,7 +1149,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     // 1. join and sync with a single member (because we can't immediately join with two members)
     // 2. join and sync with the first member and a new member
 
-    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -768,7 +1186,7 @@ class GroupCoordinatorTest extends JUnitSuite {
 
   @Test
   def testJoinGroupFromUnchangedLeaderShouldRebalance() {
-    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -796,7 +1214,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     */
   @Test
   def testSecondMemberPartiallyJoinAndTimeout() {
-    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -852,7 +1270,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     */
   private def setupGroupWithPendingMember(): JoinGroupResult = {
     // add the first member
-    val joinResult1 = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val joinResult1 = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     assertGroupState(groupState = CompletingRebalance)
 
     // now the group is stable, with the one member that joined above
@@ -971,7 +1389,7 @@ class GroupCoordinatorTest extends JUnitSuite {
                                sessionTimeout: Int = DefaultSessionTimeout,
                                rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
     val requireKnownMemberId = true
-    val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
+    val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
     Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
@@ -981,7 +1399,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     // 1. join and sync with a single member (because we can't immediately join with two members)
     // 2. join and sync with the first member and a new member
 
-    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -1025,7 +1443,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     // 1. join and sync with a single member (because we can't immediately join with two members)
     // 2. join and sync with the first member and a new member
 
-    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -1074,7 +1492,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     // 1. join and sync with a single member (because we can't immediately join with two members)
     // 2. join and sync with the first member and a new member
 
-    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val firstMemberId = joinGroupResult.memberId
     val firstGenerationId = joinGroupResult.generationId
     assertEquals(firstMemberId, joinGroupResult.leaderId)
@@ -1147,7 +1565,7 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     // A group member joins
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
@@ -1495,7 +1913,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp = new TopicPartition("topic", 0)
     val offset = offsetAndMetadata(0)
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
@@ -1509,7 +1927,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
     // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
-    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val initialGenerationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
@@ -1527,7 +1945,7 @@ class GroupCoordinatorTest extends JUnitSuite {
 
   @Test
   def testGenerationIdIncrementsOnRebalance() {
-    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val initialGenerationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
     val memberId = joinGroupResult.memberId
@@ -1569,7 +1987,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "consumerId"
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
 
@@ -1582,7 +2000,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testValidLeaveGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
@@ -1595,7 +2013,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testListGroupsIncludesStableGroups() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     assertEquals(Errors.NONE, joinGroupResult.error)
@@ -1614,7 +2032,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testListGroupsIncludesRebalancingGroups() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     assertEquals(Errors.NONE, joinGroupResult.error)
 
     val (error, groups) = groupCoordinator.handleListGroups()
@@ -1641,7 +2059,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testDescribeGroupStable() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
@@ -1664,7 +2082,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testDescribeGroupRebalancing() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
 
@@ -1682,7 +2100,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testDeleteNonEmptyGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    joinGroup(groupId, memberId, protocolType, protocols)
+    dynamicJoinGroup(groupId, memberId, protocolType, protocols)
 
     val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
@@ -1704,7 +2122,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testDeleteEmptyGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
 
     EasyMock.reset(replicaManager)
     val leaveGroupResult = leaveGroup(groupId, joinGroupResult.memberId)
@@ -1727,7 +2145,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testDeleteEmptyGroupWithStoredOffsets() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
@@ -1836,6 +2254,11 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, thirdResult.error)
   }
 
+  private def getGroup(groupId: String): GroupMetadata = {
+    val groupOpt = groupCoordinator.groupManager.getGroup(groupId)
+    assertTrue(groupOpt.isDefined)
+    groupOpt.get
+  }
   private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
     val responsePromise = Promise[JoinGroupResult]
     val responseFuture = responsePromise.future
@@ -1869,6 +2292,7 @@ class GroupCoordinatorTest extends JUnitSuite {
                             memberId: String,
                             protocolType: String,
                             protocols: List[(String, Array[Byte])],
+                            groupInstanceId: Option[String] = None,
                             sessionTimeout: Int = DefaultSessionTimeout,
                             rebalanceTimeout: Int = DefaultRebalanceTimeout,
                             requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
@@ -1876,8 +2300,8 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleJoinGroup(groupId, memberId, requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout,
-      protocolType, protocols, responseCallback)
+    groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
+      requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
     responseFuture
   }
 
@@ -1921,14 +2345,14 @@ class GroupCoordinatorTest extends JUnitSuite {
     responseFuture
   }
 
-  private def joinGroup(groupId: String,
-                        memberId: String,
-                        protocolType: String,
-                        protocols: List[(String, Array[Byte])],
-                        sessionTimeout: Int = DefaultSessionTimeout,
-                        rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
+  private def dynamicJoinGroup(groupId: String,
+                               memberId: String,
+                               protocolType: String,
+                               protocols: List[(String, Array[Byte])],
+                               sessionTimeout: Int = DefaultSessionTimeout,
+                               rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
     val requireKnownMemberId = true
-    var responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
+    var responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
 
     // Since member id is required, we need another bounce to get the successful join group result.
     if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID && requireKnownMemberId) {
@@ -1938,13 +2362,27 @@ class GroupCoordinatorTest extends JUnitSuite {
         return joinGroupResult
       }
       EasyMock.reset(replicaManager)
-      responseFuture = sendJoinGroup(groupId, joinGroupResult.memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
+      responseFuture = sendJoinGroup(groupId, joinGroupResult.memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
     }
     timer.advanceClock(GroupInitialRebalanceDelay + 1)
     // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
     Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
+  private def staticJoinGroup(groupId: String,
+                              memberId: String,
+                              groupInstanceId: Option[String],
+                              protocolType: String,
+                              protocols: List[(String, Array[Byte])],
+                              clockAdvance: Int = GroupInitialRebalanceDelay + 1,
+                              sessionTimeout: Int = DefaultSessionTimeout,
+                              rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
+    val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, groupInstanceId, sessionTimeout, rebalanceTimeout)
+
+    timer.advanceClock(clockAdvance)
+    // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
+    Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
+  }
 
   private def syncGroupFollower(groupId: String,
                                 generationId: Int,
@@ -2060,5 +2498,4 @@ class GroupCoordinatorTest extends JUnitSuite {
   private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
     OffsetAndMetadata(offset, "", timer.time.milliseconds())
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 0287888..6c83da3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -59,6 +59,7 @@ class GroupMetadataManagerTest {
   var defaultOffsetRetentionMs = Long.MaxValue
 
   val groupId = "foo"
+  val groupInstanceId = Some("bar")
   val groupPartitionId = 0
   val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
   val protocolType = "protocolType"
@@ -775,6 +776,53 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testloadGroupWithStaticMember() {
+    val generation = 27
+    val protocolType = "consumer"
+    val staticMemberId = "staticMemberId"
+    val dynamicMemberId = "dynamicMemberId"
+
+    val staticMember = new MemberMetadata(staticMemberId, groupId, groupInstanceId, "", "", rebalanceTimeout, sessionTimeout,
+      protocolType, List(("protocol", Array[Byte]())))
+
+    val dynamicMember = new MemberMetadata(dynamicMemberId, groupId, None, "", "", rebalanceTimeout, sessionTimeout,
+      protocolType, List(("protocol", Array[Byte]())))
+
+    val members = Seq(staticMember, dynamicMember)
+
+    val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, members, time)
+
+    assertTrue(group.is(Empty))
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertTrue(group.has(staticMemberId))
+    assertTrue(group.has(dynamicMemberId))
+    assertTrue(group.hasStaticMember(groupInstanceId))
+    assertEquals(staticMemberId, group.getStaticMemberId(groupInstanceId))
+  }
+
+  @Test
+  def testReadFromOldGroupMetadata() {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val oldApiVersions = Array(KAFKA_0_9_0, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0)
+
+    for (apiVersion <- oldApiVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      assertEquals(groupId, deserializedGroupMetadata.groupId)
+      assertEquals(generation, deserializedGroupMetadata.generationId)
+      assertEquals(protocolType, deserializedGroupMetadata.protocolType.get)
+      assertEquals(protocol, deserializedGroupMetadata.protocolOrNull)
+      assertEquals(1, deserializedGroupMetadata.allMembers.size)
+      assertTrue(deserializedGroupMetadata.allMembers.contains(memberId))
+      assertTrue(deserializedGroupMetadata.allStaticMembers.isEmpty)
+    }
+  }
+
+  @Test
   def testStoreEmptyGroup() {
     val generation = 27
     val protocolType = "consumer"
@@ -875,7 +923,7 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
     group.add(member, _ => ())
     group.transitionTo(PreparingRebalance)
@@ -904,7 +952,7 @@ class GroupMetadataManagerTest {
 
     val group = new GroupMetadata(groupId, Empty, time)
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
     group.add(member, _ => ())
     group.transitionTo(PreparingRebalance)
@@ -1403,7 +1451,7 @@ class GroupMetadataManagerTest {
     groupMetadataManager.addGroup(group)
 
     val subscription = new Subscription(List(topic).asJava)
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array())))
     group.add(member, _ => ())
     group.transitionTo(PreparingRebalance)
@@ -1896,7 +1944,7 @@ class GroupMetadataManagerTest {
                                                assignmentSize: Int = 0,
                                                apiVersion: ApiVersion = ApiVersion.latestVersion): SimpleRecord = {
     val memberProtocols = List((protocol, Array.emptyByteArray))
-    val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
     val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, memberId,
       if (apiVersion >= KAFKA_2_1_IV0) Some(time.milliseconds()) else None, Seq(member), time)
     val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 3108b15..11af899 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -30,6 +30,7 @@ import org.scalatest.junit.JUnitSuite
 class GroupMetadataTest extends JUnitSuite {
   private val protocolType = "consumer"
   private val groupId = "groupId"
+  private val groupInstanceId = Some("groupInstanceId")
   private val clientId = "clientId"
   private val clientHost = "clientHost"
   private val rebalanceTimeoutMs = 60000
@@ -191,14 +192,14 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testSelectProtocol() {
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
       protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     group.add(member)
     assertEquals("range", group.selectProtocol)
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
     group.add(otherMember)
@@ -206,7 +207,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertTrue(Set("range", "roundrobin")(group.selectProtocol))
 
     val lastMemberId = "lastMemberId"
-    val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+    val lastMember = new MemberMetadata(lastMemberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
     group.add(lastMember)
@@ -223,11 +224,11 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testSelectProtocolChoosesCompatibleProtocol() {
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
       protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
     group.add(member)
@@ -241,7 +242,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertTrue(group.supportsProtocols(protocolType, Set("roundrobin", "range")))
 
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     group.add(member)
@@ -251,7 +252,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertFalse(group.supportsProtocols(protocolType, Set("foo", "bar")))
 
     val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+    val otherMember = new MemberMetadata(otherMemberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
     group.add(otherMember)
@@ -264,7 +265,7 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testInitNextGeneration() {
     val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
       protocolType, List(("roundrobin", Array.empty[Byte])))
 
     group.transitionTo(PreparingRebalance)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/MemberMetadataTest.scala
index 48c9270..986d015 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/MemberMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/MemberMetadataTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.junit.JUnitSuite
 
 class MemberMetadataTest extends JUnitSuite {
   val groupId = "groupId"
+  val groupInstanceId = Some("groupInstanceId")
   val clientId = "clientId"
   val clientHost = "clientHost"
   val memberId = "memberId"
@@ -36,7 +37,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testMatchesSupportedProtocols() {
     val protocols = List(("range", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
       protocolType, protocols)
     assertTrue(member.matches(protocols))
     assertFalse(member.matches(List(("range", Array[Byte](0)))))
@@ -48,7 +49,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testVoteForPreferredProtocol() {
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
       protocolType, protocols)
     assertEquals("range", member.vote(Set("range", "roundrobin")))
     assertEquals("roundrobin", member.vote(Set("blah", "roundrobin")))
@@ -58,7 +59,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testMetadata() {
     val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1)))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
       protocolType, protocols)
     assertTrue(Arrays.equals(Array[Byte](0), member.metadata("range")))
     assertTrue(Arrays.equals(Array[Byte](1), member.metadata("roundrobin")))
@@ -68,7 +69,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testMetadataRaisesOnUnsupportedProtocol() {
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
       protocolType, protocols)
     member.metadata("blah")
     fail()
@@ -78,11 +79,19 @@ class MemberMetadataTest extends JUnitSuite {
   def testVoteRaisesOnNoSupportedProtocols() {
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
       protocolType, protocols)
     member.vote(Set("blah"))
     fail()
   }
 
+  @Test
+  def testHasValidGroupInstanceId() {
+    val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1)))
 
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+      protocolType, protocols)
+    assertTrue(member.isStaticMember)
+    assertEquals(groupInstanceId, member.groupInstanceId)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ae74fa1..d9c8bb5 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -282,6 +282,7 @@ class RequestQuotaTest extends BaseRequestTest {
               .setGroupId("test-join-group")
               .setSessionTimeoutMs(200)
               .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+              .setGroupInstanceId(null)
               .setProtocolType("consumer")
               .setProtocols(
                 new JoinGroupRequestData.JoinGroupRequestProtocolSet(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1bd09e4..cb4629f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -671,6 +671,7 @@ public class StreamThread extends Thread {
             originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
             consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
         }
+
         final Consumer<byte[], byte[]> consumer = clientSupplier.getConsumer(consumerConfigs);
         taskManager.setConsumer(consumer);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index a598400..6a724ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -157,7 +157,7 @@ public class FineGrainedAutoResetIntegrationTest {
     }
 
     @Test
-    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws  Exception {
+    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception {
         streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
 
         final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage);
@@ -165,13 +165,13 @@ public class FineGrainedAutoResetIntegrationTest {
     }
 
     @Test
-    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws  Exception {
+    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception {
         final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
         shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, OUTPUT_TOPIC_1, expectedReceivedValues);
     }
 
     @Test
-    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws  Exception {
+    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception {
         commitInvalidOffsets();
 
         final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
@@ -230,7 +230,7 @@ public class FineGrainedAutoResetIntegrationTest {
     private void commitInvalidOffsets() {
         final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(TestUtils.consumerConfig(
             CLUSTER.bootstrapServers(),
-            streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
+            "commit_invalid_offset_app", // Having a separate application id to avoid waiting for last test poll interval timeout.
             StringDeserializer.class,
             StringDeserializer.class));
 
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 532c59f..cec8ea0 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -33,8 +33,9 @@ class ConsumerState:
 
 class ConsumerEventHandler(object):
 
-    def __init__(self, node, verify_offsets):
+    def __init__(self, node, verify_offsets, idx):
         self.node = node
+        self.idx = idx
         self.state = ConsumerState.Dead
         self.revoked_count = 0
         self.assigned_count = 0
@@ -165,7 +166,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         }
 
     def __init__(self, context, num_nodes, kafka, topic, group_id,
-                 max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
+                 static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
                  assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
                  version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
                  on_record_consumed=None, reset_policy="earliest", verify_offsets=True):
@@ -179,6 +180,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         self.topic = topic
         self.group_id = group_id
         self.reset_policy = reset_policy
+        self.static_membership = static_membership
         self.max_messages = max_messages
         self.session_timeout_sec = session_timeout_sec
         self.enable_autocommit = enable_autocommit
@@ -202,7 +204,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
     def _worker(self, idx, node):
         with self.lock:
             if node not in self.event_handlers:
-                self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets)
+                self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets, idx)
             handler = self.event_handlers[node]
 
         node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
@@ -220,6 +222,10 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         self.logger.info(self.prop_file)
         node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file)
         self.security_config.setup_node(node)
+        # apply group.instance.id to the node for static membership validation
+        node.group_instance_id = None
+        if self.static_membership:
+            node.group_instance_id = self.group_id + "-instance-" + str(idx)
         cmd = self.start_cmd(node)
         self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
 
@@ -286,8 +292,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         cmd += self.impl.exec_cmd(node)
         if self.on_record_consumed:
             cmd += " --verbose"
-        cmd += " --reset-policy %s --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
-               (self.reset_policy, self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
+        cmd += " --reset-policy %s --group-id %s --topic %s --group-instance-id %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
+               (self.reset_policy, self.group_id, self.topic, node.group_instance_id, self.kafka.bootstrap_servers(self.security_config.security_protocol),
                self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "")
                
         if self.max_messages > 0:
@@ -365,6 +371,11 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         with self.lock:
             return max(handler.assigned_count for handler in self.event_handlers.itervalues())
 
+    def num_revokes_for_alive(self, keep_alive=1):
+        with self.lock:
+            return max([handler.revoked_count for handler in self.event_handlers.itervalues()
+                       if handler.idx <= keep_alive])
+
     def joined_nodes(self):
         with self.lock:
             return [handler.node for handler in self.event_handlers.itervalues()
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index a7ec9c8..2abc4f1 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -33,9 +33,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
             self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
         })
 
-    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+    def rolling_bounce_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_shutdown=True):
         for _ in range(num_bounces):
-            for node in consumer.nodes:
+            for node in consumer.nodes[keep_alive:]:
                 consumer.stop_node(node, clean_shutdown)
 
                 wait_until(lambda: len(consumer.dead_nodes()) == 1,
@@ -47,15 +47,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 self.await_all_members(consumer)
                 self.await_consumed_messages(consumer)
 
-    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+    def bounce_all_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_shutdown=True):
         for _ in range(num_bounces):
-            for node in consumer.nodes:
+            for node in consumer.nodes[keep_alive:]:
                 consumer.stop_node(node, clean_shutdown)
 
-            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers - keep_alive, timeout_sec=10,
                        err_msg="Timed out waiting for the consumers to shutdown")
             
-            for node in consumer.nodes:
+            for node in consumer.nodes[keep_alive:]:
                 consumer.start_node(node)
 
             self.await_all_members(consumer)
@@ -145,7 +145,69 @@ class OffsetValidationTest(VerifiableConsumerTest):
             self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
         else:
             self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
-                
+
+        consumer.stop_all()
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records %d did not match consumed position %d" % \
+                (consumer.total_consumed(), consumer.current_position(partition))
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position %d greater than the total number of consumed records %d" % \
+                (consumer.current_position(partition), consumer.total_consumed())
+
+    @cluster(num_nodes=7)
+    @matrix(clean_shutdown=[True], static_membership=[True, False], bounce_mode=["all", "rolling"], num_bounces=[5])
+    def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_mode, num_bounces):
+        """
+        Verify correct static consumer behavior when the consumers in the group are restarted. In order to make
+        sure the behavior of static members are different from dynamic ones, we take both static and dynamic
+        membership into this test suite.
+
+        Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers as static/dynamic members and wait until they've joined the group.
+        - In a loop, restart each consumer except the first member (note: may not be the leader), and expect no rebalance triggered
+          during this process if the group is in static membership.
+        """
+        partition = TopicPartition(self.TOPIC, 0)
+
+        producer = self.setup_producer(self.TOPIC)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        self.session_timeout_sec = 60
+        consumer = self.setup_consumer(self.TOPIC, static_membership=static_membership)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        num_revokes_before_bounce = consumer.num_revokes_for_alive()
+
+        num_keep_alive = 1
+
+        if bounce_mode == "all":
+            self.bounce_all_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces)
+        else:
+            self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces)
+
+        num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce
+
+        check_condition = num_revokes_after_bounce != 0
+        # under static membership, the live consumer shall not revoke any current running partitions,
+        # since there is no global rebalance being triggered.
+        if static_membership:
+            check_condition = num_revokes_after_bounce == 0
+
+        assert check_condition, \
+            "Total revoked count %d does not match the expectation of having 0 revokes as %d" % \
+            (num_revokes_after_bounce, check_condition)
+
         consumer.stop_all()
         if clean_shutdown:
             # if the total records consumed matches the current position, we haven't seen any duplicates
diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py
index 539a0f3..071439d 100644
--- a/tests/kafkatest/tests/verifiable_consumer_test.py
+++ b/tests/kafkatest/tests/verifiable_consumer_test.py
@@ -53,10 +53,10 @@ class VerifiableConsumerTest(KafkaTest):
         """Override this since we're adding services outside of the constructor"""
         return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers
 
-    def setup_consumer(self, topic, enable_autocommit=False,
+    def setup_consumer(self, topic, static_membership=False, enable_autocommit=False,
                        assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", **kwargs):
         return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
-                                  topic, self.group_id, session_timeout_sec=self.session_timeout_sec,
+                                  topic, self.group_id, static_membership=static_membership, session_timeout_sec=self.session_timeout_sec,
                                   assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit,
                                   log_level="TRACE", **kwargs)
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 43d30d0..fe41a21 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -524,6 +524,14 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
                 .dest("groupId")
                 .help("The groupId shared among members of the consumer group");
 
+        parser.addArgument("--group-instance-id")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("GROUP_INSTANCE_ID")
+                .dest("groupInstanceId")
+                .help("A unique identifier of the consumer instance");
+
         parser.addArgument("--max-messages")
                 .action(store())
                 .required(false)
@@ -600,6 +608,11 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
         }
 
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));
+
+        String groupInstanceId = res.getString("groupInstanceId");
+        if (!groupInstanceId.equals("None")) {
+            consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+        }
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));


Mime
View raw message