kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7142: fix joinGroup performance issues (#5354)
Date Mon, 06 Aug 2018 20:20:47 GMT
This is an automated email from the ASF dual-hosted git repository.

sriharsha pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b01f8fb  KAFKA-7142: fix joinGroup performance issues (#5354)
b01f8fb is described below

commit b01f8fb668988caa19feb63e878fe1901a9d0c89
Author: ying-zheng <zheng.ying@rocketmail.com>
AuthorDate: Mon Aug 6 13:20:40 2018 -0700

    KAFKA-7142: fix joinGroup performance issues (#5354)
    
    Summary:
    1. Revert GroupMetadata.members to private
    2. Add back a wrongly removed comment
    3. In GroupMetadata.remove(), update supportedProtocols and awaitingJoinCallbackMembers,
only when the remove succeeded
    
    Reviewers: Jason Gustafson <jason@confluent.io>,  Ismael Juma <ismael@juma.me.uk>,
Sriharsha Chintalapani <sriharsha@apache.org>
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 17 +++----
 .../kafka/coordinator/group/GroupMetadata.scala    | 53 +++++++++++++++++++---
 .../group/GroupMetadataManagerTest.scala           |  9 ++--
 .../coordinator/group/GroupMetadataTest.scala      |  3 +-
 4 files changed, 56 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 6ca443f..c4e6dc9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -600,11 +600,9 @@ class GroupCoordinator(val brokerId: Int,
         case Empty | Dead =>
         case PreparingRebalance =>
           for (member <- group.allMemberMetadata) {
-            if (member.awaitingJoinCallback != null) {
-              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR))
-              member.awaitingJoinCallback = null
-            }
+            group.invokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR))
           }
+
           joinPurgatory.checkAndComplete(GroupKey(group.groupId))
 
         case Stable | CompletingRebalance =>
@@ -704,12 +702,11 @@ class GroupCoordinator(val brokerId: Int,
     val memberId = clientId + "-" + group.generateMemberIdSuffix
     val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, protocols)
-    member.awaitingJoinCallback = callback
     // update the newMemberAdded flag to indicate that the join group can be further delayed
     if (group.is(PreparingRebalance) && group.generationId == 0)
       group.newMemberAdded = true
 
-    group.add(member)
+    group.add(member, callback)
     maybePrepareRebalance(group, s"Adding new member $memberId")
     member
   }
@@ -718,8 +715,7 @@ class GroupCoordinator(val brokerId: Int,
                                        member: MemberMetadata,
                                        protocols: List[(String, Array[Byte])],
                                        callback: JoinCallback) {
-    member.supportedProtocols = protocols
-    member.awaitingJoinCallback = callback
+    group.updateMember(member, protocols, callback)
     maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}")
   }
 
@@ -765,7 +761,7 @@ class GroupCoordinator(val brokerId: Int,
 
   def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
     group.inLock {
-      if (group.notYetRejoinedMembers.isEmpty)
+      if (group.hasAllMembersJoined)
         forceComplete()
       else false
     }
@@ -816,8 +812,7 @@ class GroupCoordinator(val brokerId: Int,
               leaderId = group.leaderOrNull,
               error = Errors.NONE)
 
-            member.awaitingJoinCallback(joinResult)
-            member.awaitingJoinCallback = null
+            group.invokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
           }
         }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index d729449..cbe78e9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -128,7 +128,7 @@ private object GroupMetadata {
     group.protocol = Option(protocol)
     group.leaderId = Option(leaderId)
     group.currentStateTimestamp = currentStateTimestamp
-    members.foreach(group.add)
+    members.foreach(group.add(_, null))
     group
   }
 }
@@ -172,6 +172,8 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long],
offs
  */
 @nonthreadsafe
 private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time)
extends Logging {
+  type JoinCallback = JoinGroupResult => Unit
+
   private[group] val lock = new ReentrantLock
 
   private var state: GroupState = initialState
@@ -182,6 +184,8 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
   private var protocol: Option[String] = None
 
   private val members = new mutable.HashMap[String, MemberMetadata]
+  private var numMembersAwaitingJoin = 0
+  private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
   private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
   private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
   private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition,
CommitRecordMetadataAndOffset]]()
@@ -202,7 +206,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
   def protocolOrNull: String = protocol.orNull
   def currentStateTimestampOrDefault: Long = currentStateTimestamp.getOrElse(-1)
 
-  def add(member: MemberMetadata) {
+  def add(member: MemberMetadata, callback: JoinCallback = null) {
     if (members.isEmpty)
       this.protocolType = Some(member.protocolType)
 
@@ -213,10 +217,19 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
     if (leaderId.isEmpty)
       leaderId = Some(member.memberId)
     members.put(member.memberId, member)
+    member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol)
+= 1 }
+    member.awaitingJoinCallback = callback
+    if (member.awaitingJoinCallback != null)
+      numMembersAwaitingJoin += 1;
   }
 
   def remove(memberId: String) {
-    members.remove(memberId)
+    members.remove(memberId).foreach { member =>
+      member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol)
-= 1 }
+      if (member.awaitingJoinCallback != null)
+        numMembersAwaitingJoin -= 1
+    }
+
     if (isLeader(memberId)) {
       leaderId = if (members.isEmpty) {
         None
@@ -230,6 +243,8 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
 
   def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
 
+  def hasAllMembersJoined = members.size <= numMembersAwaitingJoin
+
   def allMembers = members.keySet
 
   def allMemberMetadata = members.values.toList
@@ -268,13 +283,37 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
 
   private def candidateProtocols = {
     // get the set of protocols that are commonly supported by all members
-    allMemberMetadata
-      .map(_.protocols)
-      .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
+    val numMembers = members.size
+    supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
   }
 
   def supportsProtocols(memberProtocols: Set[String]) = {
-    members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
+    val numMembers = members.size
+    members.isEmpty || memberProtocols.exists(supportedProtocols(_) == numMembers)
+  }
+
+  def updateMember(member: MemberMetadata,
+                   protocols: List[(String, Array[Byte])],
+                   callback: JoinCallback) = {
+    member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol)
-= 1 }
+    protocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
+    member.supportedProtocols = protocols
+
+    if (callback != null && member.awaitingJoinCallback == null) {
+      numMembersAwaitingJoin += 1;
+    } else if (callback == null && member.awaitingJoinCallback != null) {
+      numMembersAwaitingJoin -= 1;
+    }
+    member.awaitingJoinCallback = callback
+  }
+
+  def invokeJoinCallback(member: MemberMetadata,
+                         joinGroupResult: JoinGroupResult) : Unit = {
+    if (member.awaitingJoinCallback != null) {
+      member.awaitingJoinCallback(joinGroupResult)
+      member.awaitingJoinCallback = null
+      numMembersAwaitingJoin -= 1;
+    }
   }
 
   def initNextGeneration() = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 77e6fdc..21c1365 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -843,8 +843,7 @@ class GroupMetadataManagerTest {
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout,
sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = _ => ()
-    group.add(member)
+    group.add(member, _ => ())
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
 
@@ -873,8 +872,7 @@ class GroupMetadataManagerTest {
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout,
sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = _ => ()
-    group.add(member)
+    group.add(member, _ => ())
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
 
@@ -1372,8 +1370,7 @@ class GroupMetadataManagerTest {
     val subscription = new Subscription(List(topic).asJava)
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout,
sessionTimeout,
       protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array())))
-    member.awaitingJoinCallback = _ => ()
-    group.add(member)
+    group.add(member, _ => ())
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 9054533..ac12804 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -266,8 +266,7 @@ class GroupMetadataTest extends JUnitSuite {
       protocolType, List(("roundrobin", Array.empty[Byte])))
 
     group.transitionTo(PreparingRebalance)
-    member.awaitingJoinCallback = _ => ()
-    group.add(member)
+    group.add(member, _ => ())
 
     assertEquals(0, group.generationId)
     assertNull(group.protocolOrNull)


Mime
View raw message