kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)
Date Tue, 24 Dec 2019 01:13:47 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 831bbd4  KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3
and below (#7753)
831bbd4 is described below

commit 831bbd4489c36207042d3b70b30c01075aca895f
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Mon Dec 23 18:55:48 2019 -0500

    KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)
    
    The v3 JoinGroup logic does not properly complete the initial heartbeat for new members,
which then expires after the static 5 minute timeout if the member does not rejoin. The core
issue is in the `shouldKeepAlive` method, which returns false when it should return true because
of an inconsistent timeout check.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/coordinator/group/GroupMetadata.scala    |  1 +
 .../kafka/coordinator/group/MemberMetadata.scala   | 14 ++++++++--
 .../coordinator/group/GroupCoordinatorTest.scala   | 32 +++++++++++++++++++++-
 3 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index e2d9c5f..b075125 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -200,6 +200,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
   def not(groupState: GroupState) = state != groupState
   def has(memberId: String) = members.contains(memberId)
   def get(memberId: String) = members(memberId)
+  def size = members.size
 
   def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
   def leaderOrNull: String = leaderId.orNull
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index 8649b3e..4c7a50e 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -67,6 +67,8 @@ private[group] class MemberMetadata(val memberId: String,
   var isNew: Boolean = false
 
   def protocols = supportedProtocols.map(_._1).toSet
+  def isAwaitingJoin = awaitingJoinCallback != null
+  def isAwaitingSync = awaitingSyncCallback != null
 
   /**
    * Get metadata corresponding to the provided protocol.
@@ -80,10 +82,16 @@ private[group] class MemberMetadata(val memberId: String,
   }
 
   def shouldKeepAlive(deadlineMs: Long): Boolean = {
-    if (awaitingJoinCallback != null)
-      !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
-    else awaitingSyncCallback != null ||
+    if (isNew) {
+      // New members are expired after the static join timeout
+      latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
+    } else if (isAwaitingJoin || isAwaitingSync) {
+      // Don't remove members as long as they have a request in purgatory
+      true
+    } else {
+      // Otherwise check for session expiration
       latestHeartbeat + sessionTimeoutMs > deadlineMs
+    }
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 1ef695e..14b3957 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -263,6 +263,36 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testNewMemberTimeoutCompletion(): Unit = {
+    val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
+    val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType,
protocols, DefaultRebalanceTimeout, sessionTimeout)
+
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+
+    val joinResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 100,
TimeUnit.MILLISECONDS))
+    val group = groupCoordinator.groupManager.getGroup(groupId).get
+    val memberId = joinResult.memberId
+
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(0, group.allMemberMetadata.count(_.isNew))
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId, memberId, Map(memberId
-> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+
+    assertEquals(Errors.NONE, syncGroupError)
+    assertEquals(1, group.size)
+
+    timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100)
+
+    // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked
+    assertEquals(1, group.size)
+
+    timer.advanceClock(sessionTimeout + 100)
+    assertEquals(0, group.size)
+  }
+
+  @Test
   def testNewMemberJoinExpiration(): Unit = {
     // This tests new member expiration during a protracted rebalance. We first create a
     // group with one member which uses a large value for session timeout and rebalance timeout.
@@ -765,7 +795,7 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     val nextGenerationId = joinResult.generationId
 
-    // with no leader SyncGroup, the follower's request should failure with an error indicating
+    // with no leader SyncGroup, the follower's request should fail with an error indicating
     // that it should rejoin
     EasyMock.reset(replicaManager)
     val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)


Mime
View raw message