kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8715; Fix buggy reliance on state timestamp in static member.id generation (#7116)
Date Fri, 26 Jul 2019 22:35:20 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new c29380f  KAFKA-8715; Fix buggy reliance on state timestamp in static member.id generation
(#7116)
c29380f is described below

commit c29380f40d0db6d0c4a549117ba910f915a5053a
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Fri Jul 26 15:31:31 2019 -0700

    KAFKA-8715; Fix buggy reliance on state timestamp in static member.id generation (#7116)
    
    The bug is that we accidentally used the current state timestamp for the group instead
of the real current time. When a group is first loaded, this timestamp is not initialized,
so this resulted in a `NoSuchElementException`. Additionally this violated the intended uniqueness
of the memberId, which could have broken the group instance fencing. Fix is made and unit
test to make sure the timestamp is properly encoded within the returned member.id.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>,
Jason Gustafson <jason@confluent.io>
---
 .../scala/kafka/coordinator/group/GroupMetadata.scala   |  2 +-
 .../kafka/coordinator/group/GroupCoordinatorTest.scala  | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 58a68a2..4d283bc8 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -365,7 +365,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
       case None =>
         clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
       case Some(instanceId) =>
-        instanceId + GroupMetadata.MemberIdDelimiter + currentStateTimestamp.get
+        instanceId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3e753c6..356b222 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -874,6 +874,23 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def shouldGetDifferentStaticMemberIdAfterEachRejoin(): Unit = {
+    val initialResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+    val timeAdvance = 1
+    var lastMemberId = initialResult.leaderId
+    for (_ <- 1 to 5) {
+      EasyMock.reset(replicaManager)
+
+      val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
+        leaderInstanceId, protocolType, protocols, clockAdvance = timeAdvance)
+      assertTrue(joinGroupResult.memberId.startsWith(leaderInstanceId.get))
+      assertNotEquals(lastMemberId, joinGroupResult.memberId)
+      lastMemberId = joinGroupResult.memberId
+    }
+  }
+
+  @Test
   def testOffsetCommitDeadGroup() {
     val memberId = "memberId"
 


Mime
View raw message