kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8225 & KIP-345 part-2: fencing static member instances with conflicting group.instance.id (#6650)
Date Sat, 18 May 2019 14:28:51 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9fa331b  KAFKA-8225 & KIP-345 part-2: fencing static member instances with conflicting group.instance.id (#6650)
9fa331b is described below

commit 9fa331b811d893a0d580e9136c1c7e1fa9774542
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Sat May 18 07:28:37 2019 -0700

    KAFKA-8225 & KIP-345 part-2: fencing static member instances with conflicting group.instance.id (#6650)
    
    For static members join/rejoin, we encode the current timestamp in the new member.id. The format looks like group.instance.id-timestamp.
    
    During consumer/broker interaction logic (Join, Sync, Heartbeat, Commit), we shall check the whether group.instance.id is known on group. If yes, we shall match the member.id stored on static membership map with the request member.id. If mismatching, this indicates a conflict consumer has used same group.instance.id, and it will receive a fatal exception to shut down.
    
    Right now the only missing part is the system test. Will work on it offline while getting the major logic changes reviewed.
    
    Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  15 ++-
 .../consumer/internals/AbstractCoordinator.java    |  26 ++++-
 .../consumer/internals/ConsumerCoordinator.java    |  22 ++++-
 .../org/apache/kafka/common/protocol/Errors.java   |   3 +-
 .../kafka/common/requests/HeartbeatRequest.java    |   6 ++
 .../kafka/common/requests/JoinGroupRequest.java    |   5 +
 .../kafka/common/requests/OffsetCommitRequest.java |   6 ++
 .../kafka/common/requests/SyncGroupRequest.java    |   6 ++
 .../resources/common/message/HeartbeatRequest.json |   7 +-
 .../common/message/HeartbeatResponse.json          |   3 +-
 .../common/message/OffsetCommitRequest.json        |   5 +-
 .../common/message/OffsetCommitResponse.json       |   3 +-
 .../resources/common/message/SyncGroupRequest.json |   5 +-
 .../common/message/SyncGroupResponse.json          |   3 +-
 .../internals/AbstractCoordinatorTest.java         |  44 ++++++++-
 .../internals/ConsumerCoordinatorTest.java         |  38 ++++++++
 .../common/requests/HeartbeatRequestTest.java      |  34 +++++++
 .../common/requests/JoinGroupRequestTest.java      |  13 +++
 .../common/requests/OffsetCommitRequestTest.java   |  34 +++++++
 .../kafka/common/requests/RequestResponseTest.java |  14 +++
 .../common/requests/SyncGroupRequestTest.java      |  34 +++++++
 .../kafka/coordinator/group/GroupCoordinator.scala | 103 +++++++++++---------
 .../kafka/coordinator/group/GroupMetadata.scala    |  32 ++++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 106 +++++++++++++--------
 .../group/GroupCoordinatorConcurrencyTest.scala    |  14 +--
 .../coordinator/group/GroupCoordinatorTest.scala   |  97 ++++++++++++++-----
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 102 +++++++++++++++++++-
 tests/kafkatest/tests/client/consumer_test.py      |  45 +++++++++
 .../org/apache/kafka/tools/VerifiableConsumer.java |   3 +
 29 files changed, 691 insertions(+), 137 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 07be128..ad7ae82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -674,15 +674,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.clientId = clientId;
             this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
 
+            LogContext logContext;
+            // If group.instance.id is set, we will append it to the log context.
             String groupInstanceId = config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
             if (groupInstanceId != null) {
                 JoinGroupRequest.validateGroupInstanceId(groupInstanceId);
                 this.groupInstanceId = Optional.of(groupInstanceId);
+                logContext = new LogContext("[Consumer instanceId=" + groupInstanceId + ", clientId=" + clientId + ", groupId=" + groupId + "] ");
             } else {
                 this.groupInstanceId = Optional.empty();
+                logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
             }
 
-            LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
             this.log = logContext.logger(getClass());
             boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
             if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided
@@ -1142,7 +1145,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws java.lang.IllegalArgumentException if the timeout value is negative
      * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
      *             partitions to consume from
-     *
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
      *
      * @deprecated Since 2.0. Use {@link #poll(Duration)}, which does not block beyond the timeout awaiting partition
      *             assignment. See <a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> for more information.
@@ -1188,6 +1191,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
      * @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid
      *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
      */
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
@@ -1316,6 +1320,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             is too large or if the topic does not exist).
      * @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires
      *            before successful completion of the offset commit
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
      */
     @Override
     public void commitSync() {
@@ -1350,6 +1355,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             is too large or if the topic does not exist).
      * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
      *            of the offset commit
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
      */
     @Override
     public void commitSync(Duration timeout) {
@@ -1396,6 +1402,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             is too large or if the topic does not exist).
      * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
      *            of the offset commit
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@@ -1433,6 +1440,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     *             is too large or if the topic does not exist).
     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
     *            of the offset commit
+    * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
     */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
@@ -1452,6 +1460,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition.
      * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
      */
     @Override
     public void commitAsync() {
@@ -1474,6 +1483,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * (and variants) returns.
      *
      * @param callback Callback to invoke when the commit completes
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
      */
     @Override
     public void commitAsync(OffsetCommitCallback callback) {
@@ -1499,6 +1509,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it
      *                is safe to mutate the map after returning.
      * @param callback Callback to invoke when the commit completes
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
      */
     @Override
     public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a1e15f8..e08a818 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
@@ -123,7 +124,6 @@ public abstract class AbstractCoordinator implements Closeable {
     protected final Time time;
     protected final long retryBackoffMs;
 
-
     private HeartbeatThread heartbeatThread = null;
     private boolean rejoinNeeded = true;
     private boolean needsJoinPrepare = true;
@@ -558,12 +558,14 @@ public abstract class AbstractCoordinator implements Closeable {
                 markCoordinatorUnknown();
                 log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
                 future.raise(error);
+            } else if (error == Errors.FENCED_INSTANCE_ID) {
+                log.error("Received fatal exception: group.instance.id gets fenced");
+                future.raise(error);
             } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                     || error == Errors.INVALID_SESSION_TIMEOUT
                     || error == Errors.INVALID_GROUP_ID
                     || error == Errors.GROUP_AUTHORIZATION_FAILED
-                    || error == Errors.GROUP_MAX_SIZE_REACHED
-                    || error == Errors.FENCED_INSTANCE_ID) {
+                    || error == Errors.GROUP_MAX_SIZE_REACHED) {
                 // log the error and re-throw the exception
                 log.error("Attempt to join group failed due to fatal error: {}", error.message());
                 if (error == Errors.GROUP_MAX_SIZE_REACHED) {
@@ -573,6 +575,10 @@ public abstract class AbstractCoordinator implements Closeable {
                 } else {
                     future.raise(error);
                 }
+            } else if (error == Errors.UNSUPPORTED_VERSION) {
+                log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" +
+                        "to see if the problem resolves");
+                future.raise(error);
             } else if (error == Errors.MEMBER_ID_REQUIRED) {
                 // Broker requires a concrete member id to be allowed to join the group. Update member id
                 // and send another join group request in next cycle.
@@ -598,6 +604,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         new SyncGroupRequestData()
                                 .setGroupId(groupId)
                                 .setMemberId(generation.memberId)
+                                .setGroupInstanceId(this.groupInstanceId.orElse(null))
                                 .setGenerationId(generation.generationId)
                                 .setAssignments(Collections.emptyList())
                 );
@@ -624,6 +631,7 @@ public abstract class AbstractCoordinator implements Closeable {
                             new SyncGroupRequestData()
                                     .setGroupId(groupId)
                                     .setMemberId(generation.memberId)
+                                    .setGroupInstanceId(this.groupInstanceId.orElse(null))
                                     .setGenerationId(generation.generationId)
                                     .setAssignments(groupAssignmentList)
                     );
@@ -657,6 +665,9 @@ public abstract class AbstractCoordinator implements Closeable {
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.debug("SyncGroup failed because the group began another rebalance");
                     future.raise(error);
+                } else if (error == Errors.FENCED_INSTANCE_ID) {
+                    log.error("Received fatal exception: group.instance.id gets fenced");
+                    future.raise(error);
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
                     log.debug("SyncGroup failed: {}", error.message());
@@ -895,6 +906,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 new HeartbeatRequest.Builder(new HeartbeatRequestData()
                         .setGroupId(groupId)
                         .setGenerationid(this.generation.generationId)
+                        .setGroupInstanceId(this.groupInstanceId.orElse(null))
                         .setMemberId(this.generation.memberId));
         return client.send(coordinator, requestBuilder)
                 .compose(new HeartbeatResponseHandler());
@@ -922,6 +934,9 @@ public abstract class AbstractCoordinator implements Closeable {
                 log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
                 resetGeneration();
                 future.raise(Errors.ILLEGAL_GENERATION);
+            } else if (error == Errors.FENCED_INSTANCE_ID) {
+                log.error("Received fatal exception: group.instance.id gets fenced");
+                future.raise(error);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                 log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
                 resetGeneration();
@@ -1126,9 +1141,12 @@ public abstract class AbstractCoordinator implements Closeable {
                                             // as the duration of the rebalance timeout. If we stop sending heartbeats,
                                             // however, then the session timeout may expire before we can rejoin.
                                             heartbeat.receiveHeartbeat();
+                                        } else if (e instanceof FencedInstanceIdException) {
+                                            log.error("Caught fenced group.instance.id {} error in heartbeat thread", groupInstanceId);
+                                            heartbeatThread.failed.set(e);
+                                            heartbeatThread.disable();
                                         } else {
                                             heartbeat.failHeartbeat();
-
                                             // wake up the thread if it's sleeping to reschedule the heartbeat
                                             AbstractCoordinator.this.notify();
                                         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 6833579..260756b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
@@ -66,6 +67,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -93,6 +95,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private MetadataSnapshot metadataSnapshot;
     private MetadataSnapshot assignmentSnapshot;
     private Timer nextAutoCommitTimer;
+    private AtomicBoolean asyncCommitFenced;
 
     // hold onto request&future for committed offset requests to enable async calls.
     private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
@@ -158,6 +161,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
         this.pendingAsyncCommits = new AtomicInteger();
+        this.asyncCommitFenced = new AtomicBoolean(false);
 
         if (autoCommitEnabled)
             this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
@@ -588,10 +592,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     // visible for testing
     void invokeCompletedOffsetCommitCallbacks() {
+        if (asyncCommitFenced.get()) {
+            throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + groupInstanceId.orElse("unset_instance_id"));
+        }
         while (true) {
             OffsetCommitCompletion completion = completedOffsetCommits.poll();
-            if (completion == null)
+            if (completion == null) {
                 break;
+            }
             completion.invoke();
         }
     }
@@ -647,9 +655,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             public void onFailure(RuntimeException e) {
                 Exception commitException = e;
 
-                if (e instanceof RetriableException)
+                if (e instanceof RetriableException) {
                     commitException = new RetriableCommitFailedException(e);
+                }
                 completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
+                if (commitException instanceof FencedInstanceIdException) {
+                    asyncCommitFenced.set(true);
+                }
             }
         });
     }
@@ -661,6 +673,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @throws org.apache.kafka.common.errors.AuthorizationException if the consumer is not authorized to the group
      *             or to any of the specified partitions. See the exception for more details
      * @throws CommitFailedException if an unrecoverable error occurs before the commit can be completed
+     * @throws FencedInstanceIdException if a static member gets fenced
      * @return If the offset commit was successfully sent and a successful response was received from
      *         the coordinator
      */
@@ -812,6 +825,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         .setGroupId(this.groupId)
                         .setGenerationId(generation.generationId)
                         .setMemberId(generation.memberId)
+                        .setGroupInstanceId(groupInstanceId.orElse(null))
                         .setTopics(new ArrayList<>(requestTopicDataMap.values()))
         );
 
@@ -872,6 +886,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                             markCoordinatorUnknown();
                             future.raise(error);
                             return;
+                        } else if (error == Errors.FENCED_INSTANCE_ID) {
+                            log.error("Received fatal exception: group.instance.id gets fenced");
+                            future.raise(error);
+                            return;
                         } else if (error == Errors.UNKNOWN_MEMBER_ID
                                 || error == Errors.ILLEGAL_GENERATION
                                 || error == Errors.REBALANCE_IN_PROGRESS) {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bf168bc..1d056e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -304,7 +304,8 @@ public enum Errors {
     PREFERRED_LEADER_NOT_AVAILABLE(80, "The preferred leader was not available",
             PreferredLeaderNotAvailableException::new),
     GROUP_MAX_SIZE_REACHED(81, "The consumer group has reached its max size.", GroupMaxSizeReachedException::new),
-    FENCED_INSTANCE_ID(82, "The coordinator reports a more recent member.id associated with the consumer's group.instance.id.",
+    FENCED_INSTANCE_ID(82, "The broker rejected this static consumer since " +
+            "another consumer with the same group.instance.id has registered with a different member.id.",
             FencedInstanceIdException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index f78cafe..ff034df 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -37,6 +38,10 @@ public class HeartbeatRequest extends AbstractRequest {
 
         @Override
         public HeartbeatRequest build(short version) {
+            if (data.groupInstanceId() != null && version < 3) {
+                throw new UnsupportedVersionException("The broker heartbeat protocol version " +
+                        version + " does not support usage of config group.instance.id.");
+            }
             return new HeartbeatRequest(data, version);
         }
 
@@ -68,6 +73,7 @@ public class HeartbeatRequest extends AbstractRequest {
                 return new HeartbeatResponse(response);
             case 1:
             case 2:
+            case 3:
                 response.setThrottleTimeMs(throttleTimeMs);
                 return new HeartbeatResponse(response);
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index f9244cf..a6ad17e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -39,6 +40,10 @@ public class JoinGroupRequest extends AbstractRequest {
 
         @Override
         public JoinGroupRequest build(short version) {
+            if (data.groupInstanceId() != null && version < 5) {
+                throw new UnsupportedVersionException("The broker join group protocol version " +
+                        version + " does not support usage of config group.instance.id.");
+            }
             return new JoinGroupRequest(data, version);
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index dd4cf78..ff53a2d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -53,6 +54,10 @@ public class OffsetCommitRequest extends AbstractRequest {
 
         @Override
         public OffsetCommitRequest build(short version) {
+            if (data.groupInstanceId() != null && version < 7) {
+                throw new UnsupportedVersionException("The broker offset commit protocol version " +
+                        version + " does not support usage of config group.instance.id.");
+            }
             return new OffsetCommitRequest(data, version);
         }
 
@@ -131,6 +136,7 @@ public class OffsetCommitRequest extends AbstractRequest {
             case 4:
             case 5:
             case 6:
+            case 7:
                 return new OffsetCommitResponse(
                         new OffsetCommitResponseData()
                                 .setTopics(responseTopicData)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 37a2451..48319d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -39,6 +40,10 @@ public class SyncGroupRequest extends AbstractRequest {
 
         @Override
         public SyncGroupRequest build(short version) {
+            if (data.groupInstanceId() != null && version < 3) {
+                throw new UnsupportedVersionException("The broker sync group protocol version " +
+                        version + " does not support usage of config group.instance.id.");
+            }
             return new SyncGroupRequest(data, version);
         }
 
@@ -72,6 +77,7 @@ public class SyncGroupRequest extends AbstractRequest {
                        );
             case 1:
             case 2:
+            case 3:
                 return new SyncGroupResponse(
                         new SyncGroupResponseData()
                             .setErrorCode(Errors.forException(e).code())
diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json b/clients/src/main/resources/common/message/HeartbeatRequest.json
index 85c8528..1b10a00 100644
--- a/clients/src/main/resources/common/message/HeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/HeartbeatRequest.json
@@ -18,13 +18,16 @@
   "type": "request",
   "name": "HeartbeatRequest",
   // Version 1 and version 2 are the same as version 0.
-  "validVersions": "0-2",
+  // Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts.
+  "validVersions": "0-3",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The group id." },
     { "name": "Generationid", "type": "int32", "versions": "0+",
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
-      "about": "The member ID." }
+      "about": "The member ID." },
+    { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+      "about": "The unique identifier of the consumer instance provided by end user." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/HeartbeatResponse.json b/clients/src/main/resources/common/message/HeartbeatResponse.json
index c19ba37..525ba26 100644
--- a/clients/src/main/resources/common/message/HeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/HeartbeatResponse.json
@@ -19,7 +19,8 @@
   "name": "HeartbeatResponse",
   // Version 1 adds throttle time.
   // Starting in version 2, on quota violation, brokers send out responses before throttling.
-  "validVersions": "0-2",
+  // Starting from version 3, heartbeatRequest supports a new field called groupInstanceId to indicate member identity across restarts.
+  "validVersions": "0-3",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 5d94a7b..0ad7565 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -26,7 +26,8 @@
   // Version 5 removes the retention time, which is now controlled only by a broker configuration.
   //
   // Version 6 adds the leader epoch for fencing.
-  "validVersions": "0-6",
+  // version 7 adds a new field called groupInstanceId to indicate member identity across restarts.
+  "validVersions": "0-7",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The unique group identifier." },
@@ -34,6 +35,8 @@
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true,
       "about": "The member ID assigned by the group coordinator." },
+    { "name": "GroupInstanceId", "type": "string", "versions": "7+", "ignorable": true, "nullableVersions": "7+",
+      "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "RetentionTimeMs", "type": "int64", "versions": "2-4", "default": "-1", "ignorable": true,
       "about": "The time period in ms to retain the offset." },
     { "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/OffsetCommitResponse.json b/clients/src/main/resources/common/message/OffsetCommitResponse.json
index d23fd13..8698be1 100644
--- a/clients/src/main/resources/common/message/OffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json
@@ -24,7 +24,8 @@
   // Starting in version 4, on quota violation, brokers send out responses before throttling.
   //
   // Versions 5 and 6 are the same as version 4.
-  "validVersions": "0-6",
+  // Version 7 offsetCommitRequest supports a new field called groupInstanceId to indicate member identity across restarts.
+  "validVersions": "0-7",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json b/clients/src/main/resources/common/message/SyncGroupRequest.json
index 00249b6..282cb2a 100644
--- a/clients/src/main/resources/common/message/SyncGroupRequest.json
+++ b/clients/src/main/resources/common/message/SyncGroupRequest.json
@@ -18,7 +18,8 @@
   "type": "request",
   "name": "SyncGroupRequest",
   // Versions 1 and 2 are the same as version 0.
-  "validVersions": "0-2",
+  // Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts.
+  "validVersions": "0-3",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The unique group identifier." },
@@ -26,6 +27,8 @@
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member ID assigned by the group." },
+    { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+      "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "Assignments", "type": "[]SyncGroupRequestAssignment", "versions": "0+",
       "about": "Each assignment.", "fields": [
       { "name": "MemberId", "type": "string", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/SyncGroupResponse.json b/clients/src/main/resources/common/message/SyncGroupResponse.json
index 0faa158..06e6bea 100644
--- a/clients/src/main/resources/common/message/SyncGroupResponse.json
+++ b/clients/src/main/resources/common/message/SyncGroupResponse.json
@@ -19,7 +19,8 @@
   "name": "SyncGroupResponse",
   // Version 1 adds throttle time.
   // Starting in version 2, on quota violation, brokers send out responses before throttling.
-  "validVersions": "0-2",
+  // Starting from version 3, syncGroupRequest supports a new field called groupInstanceId to indicate member identity across restarts.
+  "validVersions": "0-3",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index c8be163..31328b3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -63,6 +64,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -243,7 +245,7 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
-    public void testJoinGroupRequestWithMemberIdMisMatch() {
+    public void testJoinGroupRequestWithFencedInstanceIdException() {
         setupCoordinator();
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
@@ -261,6 +263,46 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testSyncGroupRequestWithFencedInstanceIdException() {
+        setupCoordinator();
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
+        final String memberId = "memberId";
+        final int generation = -1;
+
+        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.FENCED_INSTANCE_ID));
+
+        assertThrows(FencedInstanceIdException.class, () -> coordinator.ensureActiveGroup());
+    }
+
+    @Test
+    public void testHeartbeatRequestWithFencedInstanceIdException() throws InterruptedException {
+        setupCoordinator();
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
+        final String memberId = "memberId";
+        final int generation = -1;
+
+        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        mockClient.prepareResponse(heartbeatResponse(Errors.FENCED_INSTANCE_ID));
+
+        try {
+            coordinator.ensureActiveGroup();
+            mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+            long startMs = System.currentTimeMillis();
+            while (System.currentTimeMillis() - startMs < 1000) {
+                Thread.sleep(10);
+                coordinator.pollHeartbeat(mockTime.milliseconds());
+            }
+            fail("Expected pollHeartbeat to raise fenced instance id exception in 1 second");
+        } catch (RuntimeException exception) {
+            assertTrue(exception instanceof FencedInstanceIdException);
+        }
+    }
+
+    @Test
     public void testJoinGroupRequestWithGroupInstanceIdNotFound() {
         setupCoordinator();
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 958e440..f0214d2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.WakeupException;
@@ -2065,6 +2066,43 @@ public class ConsumerCoordinatorTest {
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
 
+    @Test(expected = FencedInstanceIdException.class)
+    public void testCommitOffsetRequestSyncWithFencedInstanceIdException() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // sync commit with invalid partitions should throw if we have no callback
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE));
+    }
+
+    @Test(expected = FencedInstanceIdException.class)
+    public void testCommitOffsetRequestAsyncWithFencedInstanceIdException() {
+        receiveFencedInstanceIdException();
+    }
+
+    @Test
+    public void testCommitOffsetRequestAsyncAlwaysReceiveFencedException() {
+        // Once we get fenced exception once, we should always hit fencing case.
+        assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
+        assertThrows(FencedInstanceIdException.class, () ->
+                coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback()));
+        assertThrows(FencedInstanceIdException.class, () ->
+                coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE)));
+    }
+
+    private void receiveFencedInstanceIdException() {
+        subscriptions.assignFromUser(singleton(t1p));
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
+
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
+        coordinator.invokeCompletedOffsetCommitCallbacks();
+    }
+
     private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
                                                                final boolean autoCommit,
                                                                final Optional<String> groupInstanceId) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/HeartbeatRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/HeartbeatRequestTest.java
new file mode 100644
index 0000000..2532213
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/HeartbeatRequestTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.junit.Test;
+
+public class HeartbeatRequestTest {
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void testRequestVersionCompatibilityFailBuild() {
+        new HeartbeatRequest.Builder(
+                new HeartbeatRequestData()
+                        .setGroupId("groupId")
+                        .setMemberId("consumerId")
+                        .setGroupInstanceId("groupInstanceId")
+        ).build((short) 2);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
index 0271aac..644af9a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -62,4 +64,15 @@ public class JoinGroupRequestTest {
             assertFalse(JoinGroupRequest.containsValidPattern(instanceId));
         }
     }
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void testRequestVersionCompatibilityFailBuild() {
+        new JoinGroupRequest.Builder(
+                new JoinGroupRequestData()
+                        .setGroupId("groupId")
+                        .setMemberId("consumerId")
+                        .setGroupInstanceId("groupInstanceId")
+                        .setProtocolType("consumer")
+        ).build((short) 4);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
new file mode 100644
index 0000000..21c3317
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.junit.Test;
+
+public class OffsetCommitRequestTest {
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void testRequestVersionCompatibilityFailBuild() {
+        new OffsetCommitRequest.Builder(
+                new OffsetCommitRequestData()
+                        .setGroupId("groupId")
+                        .setMemberId("consumerId")
+                        .setGroupInstanceId("groupInstanceId")
+        ).build((short) 6);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 85ea49c..5413b11 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -797,15 +797,28 @@ public class RequestResponseTest {
                             .setGroupId("group1")
                             .setSessionTimeoutMs(30000)
                             .setMemberId("consumer1")
+                            .setGroupInstanceId(null)
                             .setProtocolType("consumer")
                             .setProtocols(protocols))
                     .build((short) version);
+        } else if (version <= 4) {
+            return new JoinGroupRequest.Builder(
+                    new JoinGroupRequestData()
+                            .setGroupId("group1")
+                            .setSessionTimeoutMs(30000)
+                            .setMemberId("consumer1")
+                            .setGroupInstanceId(null)
+                            .setProtocolType("consumer")
+                            .setProtocols(protocols)
+                            .setRebalanceTimeoutMs(60000)) // v1 and above contains rebalance timeout
+                    .build((short) version);
         } else {
             return new JoinGroupRequest.Builder(
                     new JoinGroupRequestData()
                             .setGroupId("group1")
                             .setSessionTimeoutMs(30000)
                             .setMemberId("consumer1")
+                            .setGroupInstanceId("groupInstanceId") // v5 and above could set group instance id
                             .setProtocolType("consumer")
                             .setProtocols(protocols)
                             .setRebalanceTimeoutMs(60000)) // v1 and above contains rebalance timeout
@@ -955,6 +968,7 @@ public class RequestResponseTest {
         return new OffsetCommitRequest.Builder(new OffsetCommitRequestData()
                 .setGroupId("group1")
                 .setMemberId("consumer1")
+                .setGroupInstanceId(null)
                 .setGenerationId(100)
                 .setTopics(Collections.singletonList(
                         new OffsetCommitRequestData.OffsetCommitRequestTopic()
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/SyncGroupRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/SyncGroupRequestTest.java
new file mode 100644
index 0000000..e4b40a4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/SyncGroupRequestTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.junit.Test;
+
+public class SyncGroupRequestTest {
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void testRequestVersionCompatibilityFailBuild() {
+        new SyncGroupRequest.Builder(
+                new SyncGroupRequestData()
+                        .setGroupId("groupId")
+                        .setMemberId("consumerId")
+                        .setGroupInstanceId("groupInstanceId")
+        ).build((short) 2);
+    }
+}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 89fb2dc..55c90e5 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -175,13 +175,13 @@ class GroupCoordinator(val brokerId: Int,
       } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
         responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else {
-        val newMemberId = clientId + "-" + group.generateMemberIdSuffix
+        val newMemberId = group.generateMemberId(clientId, groupInstanceId)
 
         if (group.hasStaticMember(groupInstanceId)) {
           val oldMemberId = group.getStaticMemberId(groupInstanceId)
 
           if (group.is(Stable)) {
-            info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while" +
+            info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while " +
               s"old member $oldMemberId will be removed. No rebalance will be triggered.")
 
             val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
@@ -249,10 +249,8 @@ class GroupCoordinator(val brokerId: Int,
             clientId, clientHost, protocolType, protocols, group, responseCallback)
         }
       } else {
-        val isKnownGroupInstanceId = group.hasStaticMember(groupInstanceId)
-        val groupInstanceIdNotFound = groupInstanceId.isDefined && !isKnownGroupInstanceId
-
-        if (isKnownGroupInstanceId && group.getStaticMemberId(groupInstanceId) != memberId) {
+        val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
+        if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
           // given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
           responseCallback(joinError(memberId, Errors.FENCED_INSTANCE_ID))
         } else if (!group.has(memberId) || groupInstanceIdNotFound) {
@@ -321,6 +319,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleSyncGroup(groupId: String,
                       generation: Int,
                       memberId: String,
+                      groupInstanceId: Option[String],
                       groupAssignment: Map[String, Array[Byte]],
                       responseCallback: SyncCallback): Unit = {
     validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
@@ -336,7 +335,7 @@ class GroupCoordinator(val brokerId: Int,
       case None =>
         groupManager.getGroup(groupId) match {
           case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
-          case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+          case Some(group) => doSyncGroup(group, generation, memberId, groupInstanceId, groupAssignment, responseCallback)
         }
     }
   }
@@ -344,16 +343,25 @@ class GroupCoordinator(val brokerId: Int,
   private def doSyncGroup(group: GroupMetadata,
                           generationId: Int,
                           memberId: String,
+                          groupInstanceId: Option[String],
                           groupAssignment: Map[String, Array[Byte]],
                           responseCallback: SyncCallback) {
     group.inLock {
-      if (!group.has(memberId)) {
+      if (group.is(Dead)) {
+        // if the group is marked as dead, it means some other thread has just removed the group
+        // from the coordinator metadata; this is likely that the group has migrated to some other
+        // coordinator OR the group is in a transient unstable phase. Let the member retry
+        // joining without the specified member id.
+        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+      } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+        responseCallback(Array.empty, Errors.FENCED_INSTANCE_ID)
+      } else if (!group.has(memberId)) {
         responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
       } else if (generationId != group.generationId) {
         responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
       } else {
         group.currentState match {
-          case Empty | Dead =>
+          case Empty =>
             responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
 
           case PreparingRebalance =>
@@ -479,6 +487,7 @@ class GroupCoordinator(val brokerId: Int,
 
   def handleHeartbeat(groupId: String,
                       memberId: String,
+                      groupInstanceId: Option[String],
                       generationId: Int,
                       responseCallback: Errors => Unit) {
     validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
@@ -495,44 +504,36 @@ class GroupCoordinator(val brokerId: Int,
         responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
       case Some(group) => group.inLock {
-        group.currentState match {
-          case Dead =>
-            // if the group is marked as dead, it means some other thread has just removed the group
-            // from the coordinator metadata; it is likely that the group has migrated to some other
-            // coordinator OR the group is in a transient unstable phase. Let the member retry
-            // joining without the specified member id.
-            responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-          case Empty =>
-            responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-          case CompletingRebalance =>
-            if (!group.has(memberId))
+        if (group.is(Dead)) {
+          // if the group is marked as dead, it means some other thread has just removed the group
+          // from the coordinator metadata; it is likely that the group has migrated to some other
+          // coordinator OR the group is in a transient unstable phase. Let the member retry
+          // joining without the specified member id.
+          responseCallback(Errors.UNKNOWN_MEMBER_ID)
+        } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+          responseCallback(Errors.FENCED_INSTANCE_ID)
+        } else if (!group.has(memberId)) {
+          responseCallback(Errors.UNKNOWN_MEMBER_ID)
+        } else if (generationId != group.generationId) {
+          responseCallback(Errors.ILLEGAL_GENERATION)
+        } else {
+          group.currentState match {
+            case Empty =>
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
-            else
-              responseCallback(Errors.REBALANCE_IN_PROGRESS)
 
-          case PreparingRebalance =>
-            if (!group.has(memberId)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID)
-            } else if (generationId != group.generationId) {
-              responseCallback(Errors.ILLEGAL_GENERATION)
-            } else {
-              val member = group.get(memberId)
-              completeAndScheduleNextHeartbeatExpiration(group, member)
-              responseCallback(Errors.REBALANCE_IN_PROGRESS)
-            }
+            case CompletingRebalance =>
+                responseCallback(Errors.REBALANCE_IN_PROGRESS)
 
-          case Stable =>
-            if (!group.has(memberId)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID)
-            } else if (generationId != group.generationId) {
-              responseCallback(Errors.ILLEGAL_GENERATION)
-            } else {
-              val member = group.get(memberId)
-              completeAndScheduleNextHeartbeatExpiration(group, member)
-              responseCallback(Errors.NONE)
-            }
+            case PreparingRebalance =>
+                val member = group.get(memberId)
+                completeAndScheduleNextHeartbeatExpiration(group, member)
+                responseCallback(Errors.REBALANCE_IN_PROGRESS)
+
+            case Stable =>
+                val member = group.get(memberId)
+                completeAndScheduleNextHeartbeatExpiration(group, member)
+                responseCallback(Errors.NONE)
+          }
         }
       }
     }
@@ -549,12 +550,13 @@ class GroupCoordinator(val brokerId: Int,
         val group = groupManager.getGroup(groupId).getOrElse {
           groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
         }
-        doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback)
+        doCommitOffsets(group, NoMemberId, None, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback)
     }
   }
 
   def handleCommitOffsets(groupId: String,
                           memberId: String,
+                          groupInstanceId: Option[String],
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
@@ -566,7 +568,7 @@ class GroupCoordinator(val brokerId: Int,
             if (generationId < 0) {
               // the group is not relying on Kafka for group management, so allow the commit
               val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
-              doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
+              doCommitOffsets(group, memberId, groupInstanceId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
                 offsetMetadata, responseCallback)
             } else {
               // or this is a request coming from an older generation. either way, reject the commit
@@ -574,7 +576,7 @@ class GroupCoordinator(val brokerId: Int,
             }
 
           case Some(group) =>
-            doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
+            doCommitOffsets(group, memberId, groupInstanceId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
               offsetMetadata, responseCallback)
         }
     }
@@ -590,6 +592,7 @@ class GroupCoordinator(val brokerId: Int,
 
   private def doCommitOffsets(group: GroupMetadata,
                               memberId: String,
+                              groupInstanceId: Option[String],
                               generationId: Int,
                               producerId: Long,
                               producerEpoch: Short,
@@ -597,7 +600,13 @@ class GroupCoordinator(val brokerId: Int,
                               responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
     group.inLock {
       if (group.is(Dead)) {
+        // if the group is marked as dead, it means some other thread has just removed the group
+        // from the coordinator metadata; it is likely that the group has migrated to some other
+        // coordinator OR the group is in a transient unstable phase. Let the member retry
+        // joining without the specified member id.
         responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
+      } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.FENCED_INSTANCE_ID))
       } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
         // The group is only using Kafka to store offsets.
         // Also, for transactional offset commits we don't need to validate group membership and the generation.
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 23eefe5..21aae42 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -137,6 +137,8 @@ private object GroupMetadata {
     })
     group
   }
+
+  private val MemberIdDelimiter = "-"
 }
 
 /**
@@ -251,7 +253,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     *   2. no member rejoined
     */
   def maybeElectNewJoinedLeader(): Boolean = {
-    leaderId.map { currentLeaderId =>
+    leaderId.exists { currentLeaderId =>
       val currentLeader = get(currentLeaderId)
       if (!currentLeader.isAwaitingJoin) {
         members.find(_._2.isAwaitingJoin) match {
@@ -272,7 +274,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
       } else {
         true
       }
-    }.getOrElse(false)
+    }
   }
 
   /**
@@ -343,7 +345,31 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     timeout.max(member.rebalanceTimeoutMs)
   }
 
-  def generateMemberIdSuffix = UUID.randomUUID().toString
+  def generateMemberId(clientId: String,
+                       groupInstanceId: Option[String]): String = {
+    groupInstanceId match {
+      case None =>
+        clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
+      case Some(instanceId) =>
+        instanceId + GroupMetadata.MemberIdDelimiter + currentStateTimestamp.get
+    }
+  }
+
+  /**
+    * Verify the member.id is up to date for static members. Return true if both conditions met:
+    *   1. given member is a known static member to group
+    *   2. group stored member.id doesn't match with given member.id
+    */
+  def isStaticMemberFenced(memberId: String,
+                           groupInstanceId: Option[String]): Boolean = {
+    if (hasStaticMember(groupInstanceId)
+      && getStaticMemberId(groupInstanceId) != memberId) {
+        error(s"given member.id $memberId is identified as a known static member ${groupInstanceId.get}," +
+          s"but not matching the expected member.id ${getStaticMemberId(groupInstanceId)}")
+        true
+    } else
+      false
+  }
 
   def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ef921bf..44642e0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -298,6 +298,22 @@ class KafkaApis(val requestChannel: RequestChannel,
     val header = request.header
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
+    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
+    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
+    // the callback for sending an offset commit response
+    def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
+      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
+      if (isDebugEnabled)
+        combinedCommitStatus.foreach { case (topicPartition, error) =>
+          if (error != Errors.NONE) {
+            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
+              s"on partition $topicPartition failed due to ${error.exceptionName}")
+          }
+        }
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
+    }
+
     // reject the request if not authorized to the group
     if (!authorize(request.session, Read, Resource(Group, offsetCommitRequest.data().groupId, LITERAL))) {
       val error = Errors.GROUP_AUTHORIZATION_FAILED
@@ -310,10 +326,19 @@ class KafkaApis(val requestChannel: RequestChannel,
             .setTopics(responseTopicList)
             .setThrottleTimeMs(requestThrottleMs)
       ))
+    } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) {
+      // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
+      // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
+      // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
+      var errorMap = new mutable.HashMap[TopicPartition, Errors]
+      for (topicData <- offsetCommitRequest.data().topics().asScala) {
+        for (partitionData <- topicData.partitions().asScala) {
+          val topicPartition = new TopicPartition(topicData.name(), partitionData.partitionIndex())
+          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+        }
+      }
+      sendResponseCallback(errorMap.toMap)
     } else {
-
-      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
       val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
 
       for (topicData <- offsetCommitRequest.data().topics().asScala) {
@@ -330,20 +355,6 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
 
-      // the callback for sending an offset commit response
-      def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
-        val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-        if (isDebugEnabled)
-          combinedCommitStatus.foreach { case (topicPartition, error) =>
-            if (error != Errors.NONE) {
-              debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-                s"on partition $topicPartition failed due to ${error.exceptionName}")
-            }
-          }
-        sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
-      }
-
       if (authorizedTopicRequestInfo.isEmpty)
         sendResponseCallback(Map.empty)
       else if (header.apiVersion == 0) {
@@ -399,9 +410,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         // call coordinator to handle commit offset
         groupCoordinator.handleCommitOffsets(
-          offsetCommitRequest.data().groupId(),
-          offsetCommitRequest.data().memberId(),
-          offsetCommitRequest.data().generationId(),
+          offsetCommitRequest.data.groupId,
+          offsetCommitRequest.data.memberId,
+          Option(offsetCommitRequest.data.groupInstanceId),
+          offsetCommitRequest.data.generationId,
           partitionData,
           sendResponseCallback)
       }
@@ -1328,12 +1340,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.data().groupId(), LITERAL))) {
+    if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) {
+      // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
+      // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
+      // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
+      sendResponseCallback(JoinGroupResult(
+        List.empty,
+        JoinGroupResponse.UNKNOWN_MEMBER_ID,
+        JoinGroupResponse.UNKNOWN_GENERATION_ID,
+        JoinGroupResponse.UNKNOWN_PROTOCOL,
+        JoinGroupResponse.UNKNOWN_MEMBER_ID,
+        Errors.UNSUPPORTED_VERSION
+      ))
+    } else if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.data().groupId(), LITERAL))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new JoinGroupResponse(
           new JoinGroupResponseData()
             .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
+            .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
             .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
             .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
             .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
@@ -1341,32 +1365,26 @@ class KafkaApis(val requestChannel: RequestChannel,
             .setMembers(Collections.emptyList())
         )
       )
-    } else {
-      val encodedGroupInstanceId = joinGroupRequest.data().groupInstanceId
-      val groupInstanceId =
-        if (encodedGroupInstanceId == null ||
-        config.interBrokerProtocolVersion < KAFKA_2_3_IV0)
-          None
-        else
-          Some(encodedGroupInstanceId)
+    }  else {
+      val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)
 
       // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
       // and groupInstanceId is configured to unknown.
       val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty
 
       // let the coordinator handle join-group
-      val protocols = joinGroupRequest.data().protocols().valuesList.asScala.map(protocol =>
+      val protocols = joinGroupRequest.data.protocols.valuesList.asScala.map(protocol =>
         (protocol.name, protocol.metadata)).toList
       groupCoordinator.handleJoinGroup(
-        joinGroupRequest.data().groupId,
-        joinGroupRequest.data().memberId,
+        joinGroupRequest.data.groupId,
+        joinGroupRequest.data.memberId,
         groupInstanceId,
         requireKnownMemberId,
         request.header.clientId,
         request.session.clientAddress.toString,
-        joinGroupRequest.data().rebalanceTimeoutMs,
-        joinGroupRequest.data().sessionTimeoutMs,
-        joinGroupRequest.data().protocolType,
+        joinGroupRequest.data.rebalanceTimeoutMs,
+        joinGroupRequest.data.sessionTimeoutMs,
+        joinGroupRequest.data.protocolType,
         protocols,
         sendResponseCallback)
     }
@@ -1385,7 +1403,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         ))
     }
 
-    if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.data.groupId, LITERAL))) {
+    if (syncGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) {
+      // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
+      // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
+      // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
+      sendResponseCallback(Array[Byte](), Errors.UNSUPPORTED_VERSION)
+    } else if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.data.groupId, LITERAL))) {
       sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
     } else {
       val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
@@ -1397,6 +1420,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         syncGroupRequest.data.groupId,
         syncGroupRequest.data.generationId,
         syncGroupRequest.data.memberId,
+        Option(syncGroupRequest.data.groupInstanceId),
         assignmentMap.result,
         sendResponseCallback
       )
@@ -1435,7 +1459,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.data.groupId, LITERAL))) {
+    if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) {
+      // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
+      // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
+      // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
+      sendResponseCallback(Errors.UNSUPPORTED_VERSION)
+    } else if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.data.groupId, LITERAL))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new HeartbeatResponse(
             new HeartbeatResponseData()
@@ -1446,6 +1475,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       groupCoordinator.handleHeartbeat(
         heartbeatRequest.data.groupId,
         heartbeatRequest.data.memberId,
+        Option(heartbeatRequest.data.groupInstanceId),
         heartbeatRequest.data.generationid,
         sendResponseCallback)
     }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 25f2ba4..3da5a0c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -180,10 +180,10 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
     override def runWithCallback(member: GroupMember, responseCallback: SyncGroupCallback): Unit = {
       if (member.leader) {
         groupCoordinator.handleSyncGroup(member.groupId, member.generationId, member.memberId,
-            member.group.assignment, responseCallback)
+          member.groupInstanceId, member.group.assignment, responseCallback)
       } else {
-         groupCoordinator.handleSyncGroup(member.groupId, member.generationId, member.memberId,
-             Map.empty[String, Array[Byte]], responseCallback)
+        groupCoordinator.handleSyncGroup(member.groupId, member.generationId, member.memberId,
+          member.groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
       }
     }
     override def awaitAndVerify(member: GroupMember): Unit = {
@@ -198,7 +198,8 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
       callback
     }
     override def runWithCallback(member: GroupMember, responseCallback: HeartbeatCallback): Unit = {
-      groupCoordinator.handleHeartbeat( member.groupId, member.memberId,  member.generationId, responseCallback)
+      groupCoordinator.handleHeartbeat(member.groupId, member.memberId,
+        member.groupInstanceId, member.generationId, responseCallback)
     }
     override def awaitAndVerify(member: GroupMember): Unit = {
        val error = await(member, DefaultSessionTimeout)
@@ -213,8 +214,8 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
     override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = {
       val tp = new TopicPartition("topic", 0)
       val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
-      groupCoordinator.handleCommitOffsets(member.groupId, member.memberId, member.generationId,
-          offsets, responseCallback)
+      groupCoordinator.handleCommitOffsets(member.groupId, member.memberId,
+        member.groupInstanceId, member.generationId, offsets, responseCallback)
     }
     override def awaitAndVerify(member: GroupMember): Unit = {
        val offsets = await(member, 500)
@@ -307,6 +308,7 @@ object GroupCoordinatorConcurrencyTest {
 
   class GroupMember(val group: Group, val groupPartitionId: Int, val leader: Boolean) extends CoordinatorMember {
     @volatile var memberId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    @volatile var groupInstanceId: Option[String] = None
     @volatile var generationId: Int = -1
     def groupId: String = group.groupId
   }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index a959abc..280fc8e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -75,6 +75,7 @@ class GroupCoordinatorTest {
   private val groupInstanceId = Some("groupInstanceId")
   private val leaderInstanceId = Some("leader")
   private val followerInstanceId = Some("follower")
+  private val invalidMemberId = "invalidMember"
   private val metadata = Array[Byte]()
   private val protocols = List(("range", metadata))
   private val protocolSuperset = List(("range", metadata), ("roundrobin", metadata))
@@ -144,20 +145,20 @@ class GroupCoordinatorTest {
 
     // SyncGroup
     var syncGroupResponse: Option[Errors] = None
-    groupCoordinator.handleSyncGroup(otherGroupId, 1, memberId, Map.empty[String, Array[Byte]],
+    groupCoordinator.handleSyncGroup(otherGroupId, 1, memberId, None, Map.empty[String, Array[Byte]],
       (_, error)=> syncGroupResponse = Some(error))
     assertEquals(Some(Errors.REBALANCE_IN_PROGRESS), syncGroupResponse)
 
     // OffsetCommit
     val topicPartition = new TopicPartition("foo", 0)
     var offsetCommitErrors = Map.empty[TopicPartition, Errors]
-    groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
+    groupCoordinator.handleCommitOffsets(otherGroupId, memberId, None, 1,
       Map(topicPartition -> offsetAndMetadata(15L)), result => { offsetCommitErrors = result })
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition))
 
     // Heartbeat
     var heartbeatError: Option[Errors] = None
-    groupCoordinator.handleHeartbeat(otherGroupId, memberId, 1, error => { heartbeatError = Some(error) })
+    groupCoordinator.handleHeartbeat(otherGroupId, memberId, None, 1, error => { heartbeatError = Some(error) })
     assertEquals(Some(Errors.NONE), heartbeatError)
 
     // DescribeGroups
@@ -441,7 +442,7 @@ class GroupCoordinatorTest {
     assertTrue(getGroup(groupId).is(CompletingRebalance))
 
     EasyMock.reset(replicaManager)
-    val syncGroupFuture = sendSyncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupFuture = sendSyncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, groupInstanceId, Map(assignedMemberId -> Array[Byte]()))
     timer.advanceClock(1)
     val syncGroupResult = Await.result(syncGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
     assertEquals(Errors.NONE, syncGroupResult._2)
@@ -469,8 +470,9 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     assertNotEquals(rebalanceResult.leaderId, joinGroupResult.leaderId)
-    val oldLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, oldLeaderSyncGroupResult._2)
+    // Old leader will get fenced.
+    val oldLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty, leaderInstanceId)
+    assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderSyncGroupResult._2)
 
     EasyMock.reset(replicaManager)
     val newLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, joinGroupResult.leaderId, Map.empty)
@@ -568,11 +570,11 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     // Sync with old member id will fail because the member id is updated
-    val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, rebalanceResult.followerId)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupWithOldMemberIdResult._2)
+    val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, rebalanceResult.followerId, followerInstanceId)
+    assertEquals(Errors.FENCED_INSTANCE_ID, syncGroupWithOldMemberIdResult._2)
 
     EasyMock.reset(replicaManager)
-    val syncGroupWithNewMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId)
+    val syncGroupWithNewMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId, followerInstanceId)
     assertEquals(Errors.NONE, syncGroupWithNewMemberIdResult._2)
     assertEquals(rebalanceResult.followerAssignment, syncGroupWithNewMemberIdResult._1)
   }
@@ -675,7 +677,6 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
-
     assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error)
   }
 
@@ -685,11 +686,55 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
-
     assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error)
   }
 
   @Test
+  def staticMemberSyncAsLeaderWithInvalidMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, "invalid", Map.empty, leaderInstanceId)
+    assertEquals(Errors.FENCED_INSTANCE_ID, syncGroupResult._2)
+  }
+
+  @Test
+  def staticMemberHeartbeatLeaderWithInvalidMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty)
+    assertEquals(Errors.NONE, syncGroupResult._2)
+
+    EasyMock.reset(replicaManager)
+    val validHeartbeatResult = heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation)
+    assertEquals(Errors.NONE, validHeartbeatResult)
+
+    EasyMock.reset(replicaManager)
+    val invalidHeartbeatResult = heartbeat(groupId, invalidMemberId, rebalanceResult.generation, leaderInstanceId)
+    assertEquals(Errors.FENCED_INSTANCE_ID, invalidHeartbeatResult)
+  }
+
+  @Test
+  def staticMemberCommitOffsetWithInvalidMemberId() {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty)
+    assertEquals(Errors.NONE, syncGroupResult._2)
+
+    val tp = new TopicPartition("topic", 0)
+    val offset = offsetAndMetadata(0)
+    EasyMock.reset(replicaManager)
+    val validOffsetCommitResult = commitOffsets(groupId, rebalanceResult.leaderId, rebalanceResult.generation, Map(tp -> offset))
+    assertEquals(Errors.NONE, validOffsetCommitResult(tp))
+
+    EasyMock.reset(replicaManager)
+    val invalidOffsetCommitResult = commitOffsets(groupId, invalidMemberId, rebalanceResult.generation, Map(tp -> offset), leaderInstanceId)
+    assertEquals(Errors.FENCED_INSTANCE_ID, invalidOffsetCommitResult(tp))
+  }
+
+  @Test
   def staticMemberJoinWithUnknownInstanceIdAndKnownMemberId() {
     val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
 
@@ -989,7 +1034,7 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2)
+    val heartbeatResult = heartbeat(groupId, assignedMemberId, 1)
     assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
   }
 
@@ -1585,7 +1630,7 @@ class GroupCoordinatorTest {
     // with no leader SyncGroup, the follower's request should failure with an error indicating
     // that it should rejoin
     EasyMock.reset(replicaManager)
-    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
+    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId, None)
 
     timer.advanceClock(DefaultSessionTimeout + 100)
 
@@ -1681,7 +1726,7 @@ class GroupCoordinatorTest {
     assertEquals(firstMemberId, otherJoinResult.leaderId)
 
     EasyMock.reset(replicaManager)
-    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId)
+    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId, None)
 
     EasyMock.reset(replicaManager)
     val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
@@ -2465,6 +2510,7 @@ class GroupCoordinatorTest {
   private def sendSyncGroupLeader(groupId: String,
                                   generation: Int,
                                   leaderId: String,
+                                  groupInstanceId: Option[String],
                                   assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
 
@@ -2486,18 +2532,19 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
+    groupCoordinator.handleSyncGroup(groupId, generation, leaderId, groupInstanceId, assignment, responseCallback)
     responseFuture
   }
 
   private def sendSyncGroupFollower(groupId: String,
                                     generation: Int,
-                                    memberId: String): Future[SyncGroupCallbackParams] = {
+                                    memberId: String,
+                                    groupInstanceId: Option[String]): Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
 
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
+    groupCoordinator.handleSyncGroup(groupId, generation, memberId, groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
     responseFuture
   }
 
@@ -2543,8 +2590,9 @@ class GroupCoordinatorTest {
   private def syncGroupFollower(groupId: String,
                                 generationId: Int,
                                 memberId: String,
+                                groupInstanceId: Option[String] = None,
                                 sessionTimeout: Int = DefaultSessionTimeout): SyncGroupCallbackParams = {
-    val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId)
+    val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId, groupInstanceId)
     Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
@@ -2552,19 +2600,21 @@ class GroupCoordinatorTest {
                               generationId: Int,
                               memberId: String,
                               assignment: Map[String, Array[Byte]],
+                              groupInstanceId: Option[String] = None,
                               sessionTimeout: Int = DefaultSessionTimeout): SyncGroupCallbackParams = {
-    val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, assignment)
+    val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, groupInstanceId, assignment)
     Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS))
   }
 
   private def heartbeat(groupId: String,
                         consumerId: String,
-                        generationId: Int): HeartbeatCallbackParams = {
+                        generationId: Int,
+                        groupInstanceId: Option[String] = None): HeartbeatCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+    groupCoordinator.handleHeartbeat(groupId, consumerId, groupInstanceId, generationId, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
@@ -2575,7 +2625,8 @@ class GroupCoordinatorTest {
   private def commitOffsets(groupId: String,
                             consumerId: String,
                             generationId: Int,
-                            offsets: Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+                            offsets: Map[TopicPartition, OffsetAndMetadata],
+                            groupInstanceId: Option[String] = None): CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
     val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
@@ -2598,7 +2649,7 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+    groupCoordinator.handleCommitOffsets(groupId, consumerId, groupInstanceId, generationId, offsets, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b83dd92..ce334ce 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -23,7 +23,7 @@ import java.util
 import java.util.{Collections, Optional}
 import java.util.Arrays.asList
 
-import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
@@ -48,9 +48,8 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
-import org.apache.kafka.common.message.JoinGroupRequestData
+import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
-import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.junit.Assert.{assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
 
@@ -545,6 +544,103 @@ class KafkaApisTest {
     EasyMock.replay(groupCoordinator)
   }
 
+  @Test
+  def rejectJoinGroupRequestWhenStaticMembershipNotSupported() {
+    val capturedResponse = expectNoThrottling()
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+    val (joinGroupRequest, requestChannelRequest) = buildRequest(new JoinGroupRequest.Builder(
+      new JoinGroupRequestData()
+        .setGroupId("test")
+        .setMemberId("test")
+        .setGroupInstanceId("instanceId")
+        .setProtocolType("consumer")
+        .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection)
+    ))
+    createKafkaApis(KAFKA_2_2_IV1).handleJoinGroupRequest(requestChannelRequest)
+
+    val response = readResponse(ApiKeys.JOIN_GROUP, joinGroupRequest, capturedResponse).asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
+    EasyMock.replay(groupCoordinator)
+  }
+
+  @Test
+  def rejectSyncGroupRequestWhenStaticMembershipNotSupported() {
+    val capturedResponse = expectNoThrottling()
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+    val (syncGroupRequest, requestChannelRequest) = buildRequest(new SyncGroupRequest.Builder(
+      new SyncGroupRequestData()
+        .setGroupId("test")
+        .setMemberId("test")
+        .setGroupInstanceId("instanceId")
+        .setGenerationId(1)
+    ))
+    createKafkaApis(KAFKA_2_2_IV1).handleSyncGroupRequest(requestChannelRequest)
+
+    val response = readResponse(ApiKeys.SYNC_GROUP, syncGroupRequest, capturedResponse).asInstanceOf[SyncGroupResponse]
+    assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
+    EasyMock.replay(groupCoordinator)
+  }
+
+  @Test
+  def rejectHeartbeatRequestWhenStaticMembershipNotSupported() {
+    val capturedResponse = expectNoThrottling()
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+    val (heartbeatRequest, requestChannelRequest) = buildRequest(new HeartbeatRequest.Builder(
+      new HeartbeatRequestData()
+        .setGroupId("test")
+        .setMemberId("test")
+        .setGroupInstanceId("instanceId")
+        .setGenerationid(1)
+    ))
+    createKafkaApis(KAFKA_2_2_IV1).handleHeartbeatRequest(requestChannelRequest)
+
+    val response = readResponse(ApiKeys.HEARTBEAT, heartbeatRequest, capturedResponse).asInstanceOf[HeartbeatResponse]
+    assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
+    EasyMock.replay(groupCoordinator)
+  }
+
+  @Test
+  def rejectOffsetCommitRequestWhenStaticMembershipNotSupported() {
+    val capturedResponse = expectNoThrottling()
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+    val (offsetCommitRequest, requestChannelRequest) = buildRequest(new OffsetCommitRequest.Builder(
+      new OffsetCommitRequestData()
+        .setGroupId("test")
+        .setMemberId("test")
+        .setGroupInstanceId("instanceId")
+        .setGenerationId(100)
+        .setTopics(Collections.singletonList(
+          new OffsetCommitRequestData.OffsetCommitRequestTopic()
+            .setName("test")
+            .setPartitions(Collections.singletonList(
+              new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                .setCommittedMetadata("")
+            ))
+        ))
+    ))
+    createKafkaApis(KAFKA_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest)
+
+    val expectedTopicErrors = Collections.singletonList(
+      new OffsetCommitResponseData.OffsetCommitResponseTopic()
+        .setName("test")
+        .setPartitions(Collections.singletonList(
+          new OffsetCommitResponseData.OffsetCommitResponsePartition()
+            .setPartitionIndex(0)
+            .setErrorCode(Errors.UNSUPPORTED_VERSION.code())
+        ))
+    )
+    val response = readResponse(ApiKeys.OFFSET_COMMIT, offsetCommitRequest, capturedResponse).asInstanceOf[OffsetCommitResponse]
+    assertEquals(expectedTopicErrors, response.data.topics())
+    EasyMock.replay(groupCoordinator)
+  }
+
   /**
    * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
    */
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index b443656..be15e6a 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -221,6 +221,51 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 "Current position %d greater than the total number of consumed records %d" % \
                 (consumer.current_position(partition), consumer.total_consumed())
 
+    @cluster(num_nodes=10)
+    @matrix(num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"])
+    def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage):
+        """
+        Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers as static members and wait until they've joined the group. Some conflict consumers will be configured with
+        - the same group.instance.id.
+        - Let normal consumers and fencing consumers start at the same time, and expect only unique consumers left.
+        """
+        partition = TopicPartition(self.TOPIC, 0)
+
+        producer = self.setup_producer(self.TOPIC)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        self.session_timeout_sec = 60
+        consumer = self.setup_consumer(self.TOPIC, static_membership=True)
+
+        self.num_consumers = num_conflict_consumers
+        conflict_consumer = self.setup_consumer(self.TOPIC, static_membership=True)
+
+        # wait original set of consumer to stable stage before starting conflict members.
+        if fencing_stage == "stable":
+            consumer.start()
+            self.await_members(consumer, len(consumer.nodes))
+
+            conflict_consumer.start()
+            self.await_members(conflict_consumer, num_conflict_consumers)
+            self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)
+
+            assert len(consumer.dead_nodes()) == num_conflict_consumers
+        else:
+            consumer.start()
+            conflict_consumer.start()
+
+            wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes),
+                       timeout_sec=self.session_timeout_sec,
+                       err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from"
+                               "normal consumer group and %d from conflict consumer group" % \
+                               (len(consumer.nodes), len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes()))
+                       )
+
     @cluster(num_nodes=7)
     @matrix(clean_shutdown=[True], enable_autocommit=[True, False])
     def test_consumer_failure(self, clean_shutdown, enable_autocommit):
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index fe41a21..82a878a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -39,6 +39,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
@@ -214,6 +215,8 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
             // we only call wakeup() once to close the consumer, so this recursion should be safe
             commitSync(offsets);
             throw e;
+        } catch (FencedInstanceIdException e) {
+            throw e;
         } catch (Exception e) {
             onComplete(offsets, e);
         }


Mime
View raw message