kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12890; Consumer group stuck in `CompletingRebalance` (#10863)
Date Thu, 17 Jun 2021 11:48:09 GMT
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d294b94  KAFKA-12890; Consumer group stuck in `CompletingRebalance` (#10863)
d294b94 is described below

commit d294b946ca78c9b8caf16547cf2fa6ed348a3033
Author: David Jacot <djacot@confluent.io>
AuthorDate: Thu Jun 17 13:46:16 2021 +0200

    KAFKA-12890; Consumer group stuck in `CompletingRebalance` (#10863)
    
    This patch introduces a new delayed operation which effectively ensures that a SyncGroup
request is received from all the stable members in the groups within the rebalance timeout.
The timer starts when the group transitions to the `CompletingRebalance` state. The previous
mechanism based on `DelayedHeartbeat` did not work anymore because of https://github.com/apache/kafka/pull/8834
which allows heartbeats while the group is in the `CompletingRebalance`.
    
    Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/coordinator/group/DelayedJoin.scala      |  36 ++--
 .../kafka/coordinator/group/DelayedRebalance.scala |  34 ++++
 .../kafka/coordinator/group/DelayedSync.scala      |  48 +++++
 .../kafka/coordinator/group/GroupCoordinator.scala | 137 +++++++++++--
 .../kafka/coordinator/group/GroupMetadata.scala    |  31 +++
 .../scala/kafka/server/DelayedOperationKey.scala   |  20 +-
 .../group/GroupCoordinatorConcurrencyTest.scala    |   8 +-
 .../coordinator/group/GroupCoordinatorTest.scala   | 211 ++++++++++++++++++++-
 .../coordinator/group/GroupMetadataTest.scala      |  46 +++++
 9 files changed, 520 insertions(+), 51 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index abebfc8..22dfa9d 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -17,7 +17,7 @@
 
 package kafka.coordinator.group
 
-import kafka.server.{DelayedOperation, DelayedOperationPurgatory, GroupKey}
+import kafka.server.{DelayedOperationPurgatory, GroupJoinKey}
 
 import scala.math.{max, min}
 
@@ -31,11 +31,16 @@ import scala.math.{max, min}
  * the group are marked as failed, and complete this operation to proceed rebalance with
  * the rest of the group.
  */
-private[group] class DelayedJoin(coordinator: GroupCoordinator,
-                                 group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout,
Some(group.lock)) {
-
+private[group] class DelayedJoin(
+  coordinator: GroupCoordinator,
+  group: GroupMetadata,
+  rebalanceTimeout: Long
+) extends DelayedRebalance(
+  rebalanceTimeout,
+  group.lock
+) {
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete
_)
+
   override def onExpiration(): Unit = {
     // try to complete delayed actions introduced by coordinator.onCompleteJoin
     tryToCompleteDelayedAction()
@@ -54,13 +59,18 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator,
   * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise
we complete the
   * rebalance.
   */
-private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
-                                        purgatory: DelayedOperationPurgatory[DelayedJoin],
-                                        group: GroupMetadata,
-                                        configuredRebalanceDelay: Int,
-                                        delayMs: Int,
-                                        remainingMs: Int) extends DelayedJoin(coordinator,
group, delayMs) {
-
+private[group] class InitialDelayedJoin(
+  coordinator: GroupCoordinator,
+  purgatory: DelayedOperationPurgatory[DelayedRebalance],
+  group: GroupMetadata,
+  configuredRebalanceDelay: Int,
+  delayMs: Int,
+  remainingMs: Int
+) extends DelayedJoin(
+  coordinator,
+  group,
+  delayMs
+) {
   override def tryComplete(): Boolean = false
 
   override def onComplete(): Unit = {
@@ -75,7 +85,7 @@ private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
           configuredRebalanceDelay,
           delay,
           remaining
-        ), Seq(GroupKey(group.groupId)))
+        ), Seq(GroupJoinKey(group.groupId)))
       } else
         super.onComplete()
     }
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala
new file mode 100644
index 0000000..bad109a
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+import java.util.concurrent.locks.Lock
+
+/**
+ * Delayed rebalance operation that is shared by DelayedJoin and DelayedSync
+ * operations. This allows us to use a common purgatory for both cases.
+ */
+private[group] abstract class DelayedRebalance(
+  rebalanceTimeoutMs: Long,
+  groupLock: Lock
+) extends DelayedOperation(
+  rebalanceTimeoutMs,
+  Some(groupLock)
+)
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedSync.scala b/core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
new file mode 100644
index 0000000..a39adef
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when the group is completing
the
+ * rebalance.
+ *
+ * Whenever a SyncGroup is received, checks that we received all the SyncGroup request from
+ * each member of the group; if yes, complete this operation.
+ *
+ * When the operation has expired, any known members that have not sent a SyncGroup requests
+ * are removed from the group. If any members is removed, the group is rebalanced.
+ */
+private[group] class DelayedSync(
+  coordinator: GroupCoordinator,
+  group: GroupMetadata,
+  generationId: Int,
+  rebalanceTimeoutMs: Long
+) extends DelayedRebalance(
+  rebalanceTimeoutMs,
+  group.lock
+) {
+  override def tryComplete(): Boolean = {
+    coordinator.tryCompletePendingSync(group, generationId, forceComplete _)
+  }
+
+  override def onExpiration(): Unit = {
+    coordinator.onExpirePendingSync(group, generationId)
+  }
+
+  override def onComplete(): Unit = { }
+}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 3fc93de..50b00e3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -53,7 +53,7 @@ class GroupCoordinator(val brokerId: Int,
                        val offsetConfig: OffsetConfig,
                        val groupManager: GroupMetadataManager,
                        val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
-                       val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+                       val rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
                        time: Time,
                        metrics: Metrics) extends Logging {
   import GroupCoordinator._
@@ -119,7 +119,7 @@ class GroupCoordinator(val brokerId: Int,
     isActive.set(false)
     groupManager.shutdown()
     heartbeatPurgatory.shutdown()
-    joinPurgatory.shutdown()
+    rebalancePurgatory.shutdown()
     info("Shutdown complete.")
   }
 
@@ -215,7 +215,7 @@ class GroupCoordinator(val brokerId: Int,
 
             // attempt to complete JoinGroup
             if (group.is(PreparingRebalance)) {
-              joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+              rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
             }
           }
       }
@@ -564,6 +564,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case CompletingRebalance =>
             group.get(memberId).awaitingSyncCallback = responseCallback
+            removePendingSyncMember(group, memberId)
 
             // if this is the leader, then we can attempt to persist state and transition
to stable
             if (group.isLeader(memberId)) {
@@ -598,6 +599,8 @@ class GroupCoordinator(val brokerId: Int,
             }
 
           case Stable =>
+            removePendingSyncMember(group, memberId)
+
             // if the group is stable, we just return the current assignment
             val memberMetadata = group.get(memberId)
             responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment,
Errors.NONE))
@@ -617,7 +620,7 @@ class GroupCoordinator(val brokerId: Int,
     def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String): Unit = {
       val member = group.get(memberId)
       removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on LeaveGroup")
-      removeHeartbeatForLeavingMember(group, member)
+      removeHeartbeatForLeavingMember(group, member.memberId)
       info(s"Member $member has left group $groupId through explicit `LeaveGroup` request")
     }
 
@@ -1104,7 +1107,7 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, JoinGroupResult(member.memberId, Errors.NOT_COORDINATOR))
           }
 
-          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+          rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
 
         case Stable | CompletingRebalance =>
           for (member <- group.allMemberMetadata) {
@@ -1112,6 +1115,8 @@ class GroupCoordinator(val brokerId: Int,
             heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, member.memberId))
           }
       }
+
+      removeSyncExpiration(group)
     }
   }
 
@@ -1208,8 +1213,8 @@ class GroupCoordinator(val brokerId: Int,
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(pendingMemberKey))
   }
 
-  private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata):
Unit = {
-    val memberKey = MemberKey(group.groupId, member.memberId)
+  private def removeHeartbeatForLeavingMember(group: GroupMetadata, memberId: String): Unit
= {
+    val memberKey = MemberKey(group.groupId, memberId)
     heartbeatPurgatory.checkAndComplete(memberKey)
   }
 
@@ -1342,9 +1347,12 @@ class GroupCoordinator(val brokerId: Int,
     if (group.is(CompletingRebalance))
       resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
 
+    // if a sync expiration is pending, cancel it.
+    removeSyncExpiration(group)
+
     val delayedRebalance = if (group.is(Empty))
       new InitialDelayedJoin(this,
-        joinPurgatory,
+        rebalancePurgatory,
         group,
         groupConfig.groupInitialRebalanceDelayMs,
         groupConfig.groupInitialRebalanceDelayMs,
@@ -1357,8 +1365,8 @@ class GroupCoordinator(val brokerId: Int,
     info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with
old generation " +
       s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})
(reason: $reason)")
 
-    val groupKey = GroupKey(group.groupId)
-    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
+    val groupKey = GroupJoinKey(group.groupId)
+    rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
   }
 
   private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason:
String): Unit = {
@@ -1371,7 +1379,7 @@ class GroupCoordinator(val brokerId: Int,
     group.currentState match {
       case Dead | Empty =>
       case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
-      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      case PreparingRebalance => rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
     }
   }
 
@@ -1379,7 +1387,7 @@ class GroupCoordinator(val brokerId: Int,
     group.remove(memberId)
 
     if (group.is(PreparingRebalance)) {
-      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
     }
   }
 
@@ -1398,9 +1406,9 @@ class GroupCoordinator(val brokerId: Int,
         info(s"Group ${group.groupId} removed dynamic members " +
           s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}")
 
-        notYetRejoinedDynamicMembers.values foreach { failedMember =>
+        notYetRejoinedDynamicMembers.values.foreach { failedMember =>
           group.remove(failedMember.memberId)
-          removeHeartbeatForLeavingMember(group, failedMember)
+          removeHeartbeatForLeavingMember(group, failedMember.memberId)
         }
       }
 
@@ -1411,9 +1419,9 @@ class GroupCoordinator(val brokerId: Int,
         // of rebalance preparing stage, and send out another delayed operation
         // until session timeout removes all the non-responsive members.
         error(s"Group ${group.groupId} could not complete rebalance because no members rejoined")
-        joinPurgatory.tryCompleteElseWatch(
+        rebalancePurgatory.tryCompleteElseWatch(
           new DelayedJoin(this, group, group.rebalanceTimeoutMs),
-          Seq(GroupKey(group.groupId)))
+          Seq(GroupJoinKey(group.groupId)))
       } else {
         group.initNextGeneration()
         if (group.is(Empty)) {
@@ -1450,7 +1458,95 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          schedulePendingSync(group)
+        }
+      }
+    }
+  }
+
+  private def removePendingSyncMember(
+    group: GroupMetadata,
+    memberId: String
+  ): Unit = {
+    group.removePendingSyncMember(memberId)
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    group.clearPendingSyncMembers()
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    val groupKey = GroupSyncKey(group.groupId)
+    rebalancePurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+    group: GroupMetadata
+  ): Unit = {
+    val delayedSync = new DelayedSync(this, group, group.generationId, group.rebalanceTimeoutMs)
+    val groupKey = GroupSyncKey(group.groupId)
+    rebalancePurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+  }
+
+  def tryCompletePendingSync(
+    group: GroupMetadata,
+    generationId: Int,
+    forceComplete: () => Boolean
+  ): Boolean = {
+    group.inLock {
+      if (generationId != group.generationId) {
+        forceComplete()
+      } else {
+        group.currentState match {
+          case Dead | Empty | PreparingRebalance =>
+            forceComplete()
+          case CompletingRebalance | Stable =>
+            if (group.hasReceivedSyncFromAllMembers)
+              forceComplete()
+            else false
+        }
+      }
+    }
+  }
+
+  def onExpirePendingSync(
+    group: GroupMetadata,
+    generationId: Int
+  ): Unit = {
+    group.inLock {
+      if (generationId != group.generationId) {
+        error(s"Received unexpected notification of sync expiration for ${group.groupId}
" +
+          s"with an old generation $generationId while the group has ${group.generationId}.")
+      } else {
+        group.currentState match {
+          case Dead | Empty | PreparingRebalance =>
+            error(s"Received unexpected notification of sync expiration after group ${group.groupId}
" +
+              s"already transitioned to the ${group.currentState} state.")
+
+          case CompletingRebalance | Stable =>
+            if (!group.hasReceivedSyncFromAllMembers) {
+              val pendingSyncMembers = group.allPendingSyncMembers
+
+              pendingSyncMembers.foreach { memberId =>
+                group.remove(memberId)
+                removeHeartbeatForLeavingMember(group, memberId)
+              }
+
+              debug(s"Group ${group.groupId} removed members who haven't " +
+                s"sent their sync request: $pendingSyncMembers")
+
+              prepareRebalance(group, s"Removing $pendingSyncMembers on pending sync request
expiration")
+            }
         }
       }
     }
@@ -1532,8 +1628,8 @@ object GroupCoordinator {
             time: Time,
             metrics: Metrics): GroupCoordinator = {
     val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
-    val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
-    GroupCoordinator(config, replicaManager, heartbeatPurgatory, joinPurgatory, time, metrics)
+    val rebalancePurgatory = DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
+    GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time,
metrics)
   }
 
   private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
@@ -1552,7 +1648,7 @@ object GroupCoordinator {
   def apply(config: KafkaConfig,
             replicaManager: ReplicaManager,
             heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
-            joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+            rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
             time: Time,
             metrics: Metrics): GroupCoordinator = {
     val offsetConfig = this.offsetConfig(config)
@@ -1563,7 +1659,8 @@ object GroupCoordinator {
 
     val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
       offsetConfig, replicaManager, time, metrics)
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager,
heartbeatPurgatory, joinPurgatory, time, metrics)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager,
heartbeatPurgatory,
+      rebalancePurgatory, time, metrics)
   }
 
   private def memberLeaveError(memberIdentity: MemberIdentity,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 53bce0b..5cb8a73 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -214,6 +214,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
   private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition,
CommitRecordMetadataAndOffset]]()
   private var receivedTransactionalOffsetCommits = false
   private var receivedConsumerOffsetCommits = false
+  private val pendingSyncMembers = new mutable.HashSet[String]
 
   // When protocolType == `consumer`, a set of subscribed topics is maintained. The set is
   // computed when a new generation is created or when the group is restored from the log.
@@ -274,6 +275,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
       leaderId = members.keys.headOption
 
     pendingMembers.remove(memberId)
+    pendingSyncMembers.remove(memberId)
   }
 
   /**
@@ -344,6 +346,34 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
     pendingMembers.add(memberId)
   }
 
+  def addPendingSyncMember(memberId: String): Boolean = {
+    if (!has(memberId)) {
+      throw new IllegalStateException(s"Attempt to add a pending sync for member $memberId
which " +
+        "is not a member of the group")
+    }
+    pendingSyncMembers.add(memberId)
+  }
+
+  def removePendingSyncMember(memberId: String): Boolean = {
+    if (!has(memberId)) {
+      throw new IllegalStateException(s"Attempt to remove a pending sync for member $memberId
which " +
+        "is not a member of the group")
+    }
+    pendingSyncMembers.remove(memberId)
+  }
+
+  def hasReceivedSyncFromAllMembers: Boolean = {
+    pendingSyncMembers.isEmpty
+  }
+
+  def allPendingSyncMembers: Set[String] = {
+    pendingSyncMembers.toSet
+  }
+
+  def clearPendingSyncMembers(): Unit = {
+    pendingSyncMembers.clear()
+  }
+
   def hasStaticMember(groupInstanceId: String): Boolean = {
     staticMembers.contains(groupInstanceId)
   }
@@ -546,6 +576,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
     }
     receivedConsumerOffsetCommits = false
     receivedTransactionalOffsetCommits = false
+    clearPendingSyncMembers()
   }
 
   def currentMemberMetadata: List[JoinGroupResponseMember] = {
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index 3be412b..05a6a99 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -32,9 +32,7 @@ object DelayedOperationKey {
 
 /* used by delayed-produce and delayed-fetch operations */
 case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey
{
-
-
-  override def keyLabel = "%s-%d".format(topic, partition)
+  override def keyLabel: String = "%s-%d".format(topic, partition)
 }
 
 object TopicPartitionOperationKey {
@@ -45,18 +43,20 @@ object TopicPartitionOperationKey {
 
 /* used by delayed-join-group operations */
 case class MemberKey(groupId: String, consumerId: String) extends DelayedOperationKey {
-
-  override def keyLabel = "%s-%s".format(groupId, consumerId)
+  override def keyLabel: String = "%s-%s".format(groupId, consumerId)
 }
 
-/* used by delayed-rebalance operations */
-case class GroupKey(groupId: String) extends DelayedOperationKey {
+/* used by delayed-join operations */
+case class GroupJoinKey(groupId: String) extends DelayedOperationKey {
+  override def keyLabel: String = "join-%s".format(groupId)
+}
 
-  override def keyLabel = groupId
+/* used by delayed-sync operations */
+case class GroupSyncKey(groupId: String) extends DelayedOperationKey {
+  override def keyLabel: String = "sync-%s".format(groupId)
 }
 
 /* used by delayed-topic operations */
 case class TopicKey(topic: String) extends DelayedOperationKey {
-
-  override def keyLabel = topic
+  override def keyLabel: String = topic
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 4facf85..2ef487c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -62,7 +62,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
   )
 
   var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = _
-  var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = _
+  var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = _
   var groupCoordinator: GroupCoordinator = _
 
   @BeforeEach
@@ -81,9 +81,9 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
     val config = KafkaConfig.fromProps(serverProps)
 
     heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer,
config.brokerId, reaperEnabled = false)
-    joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId,
reaperEnabled = false)
+    rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer,
config.brokerId, reaperEnabled = false)
 
-    groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, joinPurgatory,
timer.time, new Metrics())
+    groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory,
timer.time, new Metrics())
     groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
       false)
   }
@@ -150,7 +150,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
     if (groupCoordinator != null)
       groupCoordinator.shutdown()
     groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory,
-      joinPurgatory, timer.time, new Metrics())
+      rebalancePurgatory, timer.time, new Metrics())
     groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
       false)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index cf14a36..0784259 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -113,9 +113,9 @@ class GroupCoordinatorTest {
     val config = KafkaConfig.fromProps(props)
 
     val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat",
timer, config.brokerId, reaperEnabled = false)
-    val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId,
reaperEnabled = false)
+    val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance",
timer, config.brokerId, reaperEnabled = false)
 
-    groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, joinPurgatory,
timer.time, new Metrics())
+    groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory,
timer.time, new Metrics())
     groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
       enableMetadataExpiration = false)
 
@@ -1667,7 +1667,7 @@ class GroupCoordinatorTest {
   }
 
   @Test
-  def testheartbeatDeadGroup(): Unit = {
+  def testHeartbeatDeadGroup(): Unit = {
     val memberId = "memberId"
 
     val deadGroupId = "deadGroupId"
@@ -1678,7 +1678,7 @@ class GroupCoordinatorTest {
   }
 
   @Test
-  def testheartbeatEmptyGroup(): Unit = {
+  def testHeartbeatEmptyGroup(): Unit = {
     val memberId = "memberId"
 
     val group = new GroupMetadata(groupId, Empty, new MockTime())
@@ -2254,6 +2254,209 @@ class GroupCoordinatorTest {
     assertEquals(0, group().numPending)
   }
 
+  private def verifyHeartbeat(
+    joinGroupResult: JoinGroupResult,
+    expectedError: Errors
+  ): Unit = {
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(
+      groupId,
+      joinGroupResult.memberId,
+      joinGroupResult.generationId
+    )
+    assertEquals(expectedError, heartbeatResult)
+  }
+
+  private def joinWithNMembers(nbMembers: Int): Seq[JoinGroupResult] = {
+    val requiredKnownMemberId = true
+
+    // First JoinRequests
+    var futures = 1.to(nbMembers).map { _ =>
+      EasyMock.reset(replicaManager)
+      sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+        None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
+    }
+
+    // Get back the assigned member ids
+    val memberIds = futures.map(await(_, 1).memberId)
+
+    // Second JoinRequests
+    futures = memberIds.map { memberId =>
+      EasyMock.reset(replicaManager)
+      sendJoinGroup(groupId, memberId, protocolType, protocols,
+        None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
+    }
+
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+    timer.advanceClock(DefaultRebalanceTimeout + 1)
+
+    futures.map(await(_, 1))
+  }
+
+  @Test
+  def testRebalanceTimesOutWhenSyncRequestIsNotReceived(): Unit = {
+    // This test case ensure that the DelayedSync does kick out all members
+    // if they don't sent a sync request before the rebalance timeout. The
+    // group is in the Stable state in this case.
+    val results = joinWithNMembers(nbMembers = 3)
+    assertEquals(Set(Errors.NONE), results.map(_.error).toSet)
+
+    // Advance time
+    timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+    // Heartbeats to ensure that heartbeating does not interfere with the
+    // delayed sync operation.
+    results.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.NONE)
+    }
+
+    // Advance part the rebalance timeout to trigger the delayed operation.
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject()))
+      .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
+      .anyTimes()
+    EasyMock.replay(replicaManager)
+
+    timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
+
+    // Heartbeats fail because none of the members have sent the sync request
+    results.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.UNKNOWN_MEMBER_ID)
+    }
+  }
+
+  @Test
+  def testRebalanceTimesOutWhenSyncRequestIsNotReceivedFromFollowers(): Unit = {
+    // This test case ensure that the DelayedSync does kick out the followers
+    // if they don't sent a sync request before the rebalance timeout. The
+    // group is in the Stable state in this case.
+    val results = joinWithNMembers(nbMembers = 3)
+    assertEquals(Set(Errors.NONE), results.map(_.error).toSet)
+
+    // Advance time
+    timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+    // Heartbeats to ensure that heartbeating does not interfere with the
+    // delayed sync operation.
+    results.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.NONE)
+    }
+
+    // Leader sends Sync
+    EasyMock.reset(replicaManager)
+    val assignments = results.map(result => result.memberId -> Array.empty[Byte]).toMap
+    val leaderResult = sendSyncGroupLeader(groupId, results.head.generationId, results.head.memberId,
+      Some(protocolType), Some(protocolName), None, assignments)
+
+    assertEquals(Errors.NONE, await(leaderResult, 1).error)
+
+    // Leader should be able to heartbeart
+    verifyHeartbeat(results.head, Errors.NONE)
+
+    // Advance part the rebalance timeout to trigger the delayed operation.
+    timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
+
+    // Leader should be able to heartbeart
+    verifyHeartbeat(results.head, Errors.REBALANCE_IN_PROGRESS)
+
+    // Followers should have been removed.
+    results.tail.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.UNKNOWN_MEMBER_ID)
+    }
+  }
+
+  @Test
+  def testRebalanceTimesOutWhenSyncRequestIsNotReceivedFromLeaders(): Unit = {
+    // This test case ensure that the DelayedSync does kick out the leader
+    // if it does not sent a sync request before the rebalance timeout. The
+    // group is in the CompletingRebalance state in this case.
+    val results = joinWithNMembers(nbMembers = 3)
+    assertEquals(Set(Errors.NONE), results.map(_.error).toSet)
+
+    // Advance time
+    timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+    // Heartbeats to ensure that heartbeating does not interfere with the
+    // delayed sync operation.
+    results.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.NONE)
+    }
+
+    // Followers send Sync
+    EasyMock.reset(replicaManager)
+    val followerResults = results.tail.map { joinGroupResult =>
+      EasyMock.reset(replicaManager)
+      sendSyncGroupFollower(groupId, joinGroupResult.generationId, joinGroupResult.memberId,
+        Some(protocolType), Some(protocolName), None)
+    }
+
+    // Advance part the rebalance timeout to trigger the delayed operation.
+    timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
+
+    val followerErrors = followerResults.map(await(_, 1).error)
+    assertEquals(Set(Errors.REBALANCE_IN_PROGRESS), followerErrors.toSet)
+
+    // Leader should have been removed.
+    verifyHeartbeat(results.head, Errors.UNKNOWN_MEMBER_ID)
+
+    // Followers should be able to heartbeat.
+    results.tail.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.REBALANCE_IN_PROGRESS)
+    }
+  }
+
+  @Test
+  def testRebalanceDoesNotTimeOutWhenAllSyncAreReceived(): Unit = {
+    // This test case ensure that the DelayedSync does not kick any
+    // members out when they have all sent their sync requests.
+    val results = joinWithNMembers(nbMembers = 3)
+    assertEquals(Set(Errors.NONE), results.map(_.error).toSet)
+
+    // Advance time
+    timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+    // Heartbeats to ensure that heartbeating does not interfere with the
+    // delayed sync operation.
+    results.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.NONE)
+    }
+
+    EasyMock.reset(replicaManager)
+    val assignments = results.map(result => result.memberId -> Array.empty[Byte]).toMap
+    val leaderResult = sendSyncGroupLeader(groupId, results.head.generationId, results.head.memberId,
+      Some(protocolType), Some(protocolName), None, assignments)
+
+    assertEquals(Errors.NONE, await(leaderResult, 1).error)
+
+    // Followers send Sync
+    EasyMock.reset(replicaManager)
+    val followerResults = results.tail.map { joinGroupResult =>
+      EasyMock.reset(replicaManager)
+      sendSyncGroupFollower(groupId, joinGroupResult.generationId, joinGroupResult.memberId,
+        Some(protocolType), Some(protocolName), None)
+    }
+
+    val followerErrors = followerResults.map(await(_, 1).error)
+    assertEquals(Set(Errors.NONE), followerErrors.toSet)
+
+    // Advance past the rebalance timeout to expire the Sync timout. All
+    // members should remain and the group should not rebalance.
+    timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
+
+    // Followers should be able to heartbeat.
+    results.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.NONE)
+    }
+
+    // Advance a bit more.
+    timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+    // Followers should be able to heartbeat.
+    results.foreach { joinGroupResult =>
+      verifyHeartbeat(joinGroupResult, Errors.NONE)
+    }
+  }
+
   private def group(groupId: String = groupId) = {
     groupCoordinator.groupManager.getGroup(groupId) match {
       case Some(g) => g
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index ce6128a..275b7f6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -663,6 +663,52 @@ class GroupMetadataTest {
     assertThrows(classOf[IllegalStateException], () => group.add(member))
   }
 
+  @Test
+  def testCannotAddPendingSyncOfUnknownMember(): Unit = {
+    assertThrows(classOf[IllegalStateException],
+      () => group.addPendingSyncMember(memberId))
+  }
+
+  @Test
+  def testCannotRemovePendingSyncOfUnknownMember(): Unit = {
+    assertThrows(classOf[IllegalStateException],
+      () => group.removePendingSyncMember(memberId))
+  }
+
+  @Test
+  def testCanAddAndRemovePendingSyncMember(): Unit = {
+    val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId, clientHost,
+      rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte])))
+    group.add(member)
+    group.addPendingSyncMember(memberId)
+    assertEquals(Set(memberId), group.allPendingSyncMembers)
+    group.removePendingSyncMember(memberId)
+    assertEquals(Set(), group.allPendingSyncMembers)
+  }
+
+  @Test
+  def testRemovalFromPendingSyncWhenMemberIsRemoved(): Unit = {
+    val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId, clientHost,
+      rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte])))
+    group.add(member)
+    group.addPendingSyncMember(memberId)
+    assertEquals(Set(memberId), group.allPendingSyncMembers)
+    group.remove(memberId)
+    assertEquals(Set(), group.allPendingSyncMembers)
+  }
+
+  @Test
+  def testNewGenerationClearsPendingSyncMembers(): Unit = {
+    val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId, clientHost,
+      rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte])))
+    group.add(member)
+    group.transitionTo(PreparingRebalance)
+    group.addPendingSyncMember(memberId)
+    assertEquals(Set(memberId), group.allPendingSyncMembers)
+    group.initNextGeneration()
+    assertEquals(Set(), group.allPendingSyncMembers)
+  }
+
   private def assertState(group: GroupMetadata, targetState: GroupState): Unit = {
     val states: Set[GroupState] = Set(Stable, PreparingRebalance, CompletingRebalance, Dead)
     val otherStates = states - targetState

Mime
View raw message