kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)
Date Tue, 24 Dec 2019 00:25:31 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new bfbd397  KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3
and below (#7753)
bfbd397 is described below

commit bfbd397f5860dcbf619002f6e8654d4cd11bbf1a
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Mon Dec 23 18:55:48 2019 -0500

    KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)
    
    The v3 JoinGroup logic does not properly complete the initial heartbeat for new members,
which then expires after the static 5 minute timeout if the member does not rejoin. The core
issue is in the `shouldKeepAlive` method, which returns false when it should return true because
of an inconsistent timeout check.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/coordinator/group/MemberMetadata.scala   | 13 +++++++--
 .../coordinator/group/GroupCoordinatorTest.scala   | 32 +++++++++++++++++++++-
 2 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index 1932f42..5645b27 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -71,6 +71,7 @@ private[group] class MemberMetadata(val memberId: String,
   var isNew: Boolean = false
 
   def isAwaitingJoin = awaitingJoinCallback != null
+  def isAwaitingSync = awaitingSyncCallback != null
 
   /**
    * Get metadata corresponding to the provided protocol.
@@ -84,10 +85,16 @@ private[group] class MemberMetadata(val memberId: String,
   }
 
   def shouldKeepAlive(deadlineMs: Long): Boolean = {
-    if (isAwaitingJoin)
-      !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
-    else awaitingSyncCallback != null ||
+    if (isNew) {
+      // New members are expired after the static join timeout
+      latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
+    } else if (isAwaitingJoin || isAwaitingSync) {
+      // Don't remove members as long as they have a request in purgatory
+      true
+    } else {
+      // Otherwise check for session expiration
       latestHeartbeat + sessionTimeoutMs > deadlineMs
+    }
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index a529750..126d01d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -288,6 +288,36 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testNewMemberTimeoutCompletion(): Unit = {
+    val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
+    val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType,
protocols, sessionTimeout, DefaultRebalanceTimeout, false)
+
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+
+    val joinResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 100,
TimeUnit.MILLISECONDS))
+    val group = groupCoordinator.groupManager.getGroup(groupId).get
+    val memberId = joinResult.memberId
+
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(0, group.allMemberMetadata.count(_.isNew))
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId, memberId, Map(memberId
-> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+
+    assertEquals(Errors.NONE, syncGroupError)
+    assertEquals(1, group.size)
+
+    timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100)
+
+    // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked
+    assertEquals(1, group.size)
+
+    timer.advanceClock(sessionTimeout + 100)
+    assertEquals(0, group.size)
+  }
+
+  @Test
   def testNewMemberJoinExpiration(): Unit = {
     // This tests new member expiration during a protracted rebalance. We first create a
     // group with one member which uses a large value for session timeout and rebalance timeout.
@@ -1008,7 +1038,7 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     val nextGenerationId = joinResult.generationId
 
-    // with no leader SyncGroup, the follower's request should failure with an error indicating
+    // with no leader SyncGroup, the follower's request should fail with an error indicating
     // that it should rejoin
     EasyMock.reset(replicaManager)
     val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)


Mime
View raw message