This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 831bbd4 KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753) 831bbd4 is described below commit 831bbd4489c36207042d3b70b30c01075aca895f Author: A. Sophie Blee-Goldman AuthorDate: Mon Dec 23 18:55:48 2019 -0500 KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753) The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check. Reviewers: Jason Gustafson --- .../kafka/coordinator/group/GroupMetadata.scala | 1 + .../kafka/coordinator/group/MemberMetadata.scala | 14 ++++++++-- .../coordinator/group/GroupCoordinatorTest.scala | 32 +++++++++++++++++++++- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index e2d9c5f..b075125 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -200,6 +200,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def not(groupState: GroupState) = state != groupState def has(memberId: String) = members.contains(memberId) def get(memberId: String) = members(memberId) + def size = members.size def isLeader(memberId: String): Boolean = leaderId.contains(memberId) def leaderOrNull: String = leaderId.orNull diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala index 8649b3e..4c7a50e 100644 --- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala @@ -67,6 +67,8 @@ private[group] class MemberMetadata(val memberId: String, var isNew: Boolean = false def protocols = supportedProtocols.map(_._1).toSet + def isAwaitingJoin = awaitingJoinCallback != null + def isAwaitingSync = awaitingSyncCallback != null /** * Get metadata corresponding to the provided protocol. @@ -80,10 +82,16 @@ private[group] class MemberMetadata(val memberId: String, } def shouldKeepAlive(deadlineMs: Long): Boolean = { - if (awaitingJoinCallback != null) - !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs - else awaitingSyncCallback != null || + if (isNew) { + // New members are expired after the static join timeout + latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs + } else if (isAwaitingJoin || isAwaitingSync) { + // Don't remove members as long as they have a request in purgatory + true + } else { + // Otherwise check for session expiration latestHeartbeat + sessionTimeoutMs > deadlineMs + } } /** diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 1ef695e..14b3957 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -263,6 +263,36 @@ class GroupCoordinatorTest extends JUnitSuite { } @Test + def testNewMemberTimeoutCompletion(): Unit = { + val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000 + val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, DefaultRebalanceTimeout, sessionTimeout) + + timer.advanceClock(GroupInitialRebalanceDelay + 1) + + val joinResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 100, TimeUnit.MILLISECONDS)) + val group = groupCoordinator.groupManager.getGroup(groupId).get + val memberId = joinResult.memberId + + assertEquals(Errors.NONE, joinResult.error) + assertEquals(0, group.allMemberMetadata.count(_.isNew)) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId, memberId, Map(memberId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + + assertEquals(Errors.NONE, syncGroupError) + assertEquals(1, group.size) + + timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100) + + // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked + assertEquals(1, group.size) + + timer.advanceClock(sessionTimeout + 100) + assertEquals(0, group.size) + } + + @Test def testNewMemberJoinExpiration(): Unit = { // This tests new member expiration during a protracted rebalance. We first create a // group with one member which uses a large value for session timeout and rebalance timeout. @@ -765,7 +795,7 @@ class GroupCoordinatorTest extends JUnitSuite { val nextGenerationId = joinResult.generationId - // with no leader SyncGroup, the follower's request should failure with an error indicating + // with no leader SyncGroup, the follower's request should fail with an error indicating // that it should rejoin EasyMock.reset(replicaManager) val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)