From commits-return-13316-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Tue Dec 24 01:13:51 2019 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 4E3E919C59 for ; Tue, 24 Dec 2019 01:13:51 +0000 (UTC) Received: (qmail 92787 invoked by uid 500); 24 Dec 2019 01:13:50 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 92759 invoked by uid 500); 24 Dec 2019 01:13:50 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 92750 invoked by uid 99); 24 Dec 2019 01:13:50 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Dec 2019 01:13:50 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 508988D80D; Tue, 24 Dec 2019 01:13:50 +0000 (UTC) Date: Tue, 24 Dec 2019 01:13:47 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.1 updated: KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157715002397.16935.16990601576118984672@gitbox.apache.org> From: jgus@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.1 X-Git-Reftype: branch X-Git-Oldrev: e82a67b29adbff67fdc8052aa9e4396e0364ccd3 X-Git-Newrev: 831bbd4489c36207042d3b70b30c01075aca895f X-Git-Rev: 831bbd4489c36207042d3b70b30c01075aca895f X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 831bbd4 KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753) 831bbd4 is described below commit 831bbd4489c36207042d3b70b30c01075aca895f Author: A. Sophie Blee-Goldman AuthorDate: Mon Dec 23 18:55:48 2019 -0500 KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753) The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check. Reviewers: Jason Gustafson --- .../kafka/coordinator/group/GroupMetadata.scala | 1 + .../kafka/coordinator/group/MemberMetadata.scala | 14 ++++++++-- .../coordinator/group/GroupCoordinatorTest.scala | 32 +++++++++++++++++++++- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index e2d9c5f..b075125 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -200,6 +200,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def not(groupState: GroupState) = state != groupState def has(memberId: String) = members.contains(memberId) def get(memberId: String) = members(memberId) + def size = members.size def isLeader(memberId: String): Boolean = leaderId.contains(memberId) def leaderOrNull: String = leaderId.orNull diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala index 8649b3e..4c7a50e 100644 --- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala @@ -67,6 +67,8 @@ private[group] class MemberMetadata(val memberId: String, var isNew: Boolean = false def protocols = supportedProtocols.map(_._1).toSet + def isAwaitingJoin = awaitingJoinCallback != null + def isAwaitingSync = awaitingSyncCallback != null /** * Get metadata corresponding to the provided protocol. @@ -80,10 +82,16 @@ private[group] class MemberMetadata(val memberId: String, } def shouldKeepAlive(deadlineMs: Long): Boolean = { - if (awaitingJoinCallback != null) - !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs - else awaitingSyncCallback != null || + if (isNew) { + // New members are expired after the static join timeout + latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs + } else if (isAwaitingJoin || isAwaitingSync) { + // Don't remove members as long as they have a request in purgatory + true + } else { + // Otherwise check for session expiration latestHeartbeat + sessionTimeoutMs > deadlineMs + } } /** diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 1ef695e..14b3957 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -263,6 +263,36 @@ class GroupCoordinatorTest extends JUnitSuite { } @Test + def testNewMemberTimeoutCompletion(): Unit = { + val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000 + val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, DefaultRebalanceTimeout, sessionTimeout) + + timer.advanceClock(GroupInitialRebalanceDelay + 1) + + val joinResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 100, TimeUnit.MILLISECONDS)) + val group = groupCoordinator.groupManager.getGroup(groupId).get + val memberId = joinResult.memberId + + assertEquals(Errors.NONE, joinResult.error) + assertEquals(0, group.allMemberMetadata.count(_.isNew)) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId, memberId, Map(memberId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + + assertEquals(Errors.NONE, syncGroupError) + assertEquals(1, group.size) + + timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100) + + // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked + assertEquals(1, group.size) + + timer.advanceClock(sessionTimeout + 100) + assertEquals(0, group.size) + } + + @Test def testNewMemberJoinExpiration(): Unit = { // This tests new member expiration during a protracted rebalance. We first create a // group with one member which uses a large value for session timeout and rebalance timeout. @@ -765,7 +795,7 @@ class GroupCoordinatorTest extends JUnitSuite { val nextGenerationId = joinResult.generationId - // with no leader SyncGroup, the follower's request should failure with an error indicating + // with no leader SyncGroup, the follower's request should fail with an error indicating // that it should rejoin EasyMock.reset(replicaManager) val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)