kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance
Date Thu, 04 Oct 2018 16:24:32 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new cd22770  KAFKA-7196; Remove heartbeat delayed operation for those removed consumers
at the end of each rebalance
cd22770 is described below

commit cd2277069d355d2a9839c97265448b5a883feb89
Author: Lincong Li <lcli@linkedin.com>
AuthorDate: Thu Oct 4 09:14:44 2018 -0700

    KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end
of each rebalance
    
    During the consumer group rebalance, when the joining group phase finishes, the heartbeat
delayed operation of the consumer that fails to rejoin the group should be removed from the
purgatory. Otherwise, even though the member ID of the consumer has been removed from the
group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat
delayed operation is going to timeout and then another unnecessary rebalance is triggered
because of it.
    
    Author: Lincong Li <lcli@linkedin.com>
    
    Reviewers: Dong Lin <lindong28@gmail.com>
    
    Closes #5556 from Lincong/remove_heartbeat_delayedOperation
    
    (cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea)
    Signed-off-by: Dong Lin <lindong28@gmail.com>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  1 +
 .../coordinator/group/GroupCoordinatorTest.scala   | 22 ++++++++++++++++++++--
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 25b5780..c70d0e9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -727,6 +727,7 @@ class GroupCoordinator(val brokerId: Int,
     group.inLock {
       // remove any members who haven't joined the group yet
       group.notYetRejoinedMembers.foreach { failedMember =>
+        removeHeartbeatForLeavingMember(group, failedMember)
         group.remove(failedMember.memberId)
         // TODO: cut the socket connection to the client
       }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3592d6a..a68a3d8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -469,11 +469,29 @@ class GroupCoordinatorTest extends JUnitSuite {
     heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
     assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
-    // now timeout the rebalance, which should kick the unjoined member out of the group
-    // and let the rebalance finish with only the new member
+    // now timeout the rebalance
     timer.advanceClock(500)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    val otherMemberId = otherJoinResult.memberId
+    val otherGenerationId = otherJoinResult.generationId
+    EasyMock.reset(replicaManager)
+    val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId
-> Array[Byte]()))
+    assertEquals(Errors.NONE, syncResult._2)
+
+    // the unjoined member should be kicked out from the group
     assertEquals(Errors.NONE, otherJoinResult.error)
+    EasyMock.reset(replicaManager)
+    heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+
+    // the joined member should get heart beat response with no error. Let the new member
keep heartbeating for a while
+    // to verify that no new rebalance is triggered unexpectedly
+    for ( _ <-  1 to 20) {
+      timer.advanceClock(500)
+      EasyMock.reset(replicaManager)
+      heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
+      assertEquals(Errors.NONE, heartbeatResult)
+    }
   }
 
   @Test


Mime
View raw message