kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix partition loading checks in GroupCoordinator (#4788)
Date Mon, 02 Apr 2018 15:35:22 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8662a02  MINOR: Fix partition loading checks in GroupCoordinator (#4788)
8662a02 is described below

commit 8662a022c42b7a6245e8ec54c544fccd8b79b6f8
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Apr 2 08:35:17 2018 -0700

    MINOR: Fix partition loading checks in GroupCoordinator (#4788)
    
    In the group coordinator, we currently check whether the partition is owned before checking
whether it is loading. Since loading is a prerequisite for partition ownership, it means that
it is not actually possible to see the COORDINATOR_LOAD_IN_PROGRESS error. The impact is mostly
harmless: while loading the group, the client may send unnecessary FindCoordinator requests
to rediscover the coordinator. I've fixed the bug and restructured the code to enable testing.
    
    In the process of fixing this bug, the following improvements have been made:
    
    1. We now verify valid groupId in all request handlers.
    2. Currently if the coordinator is loading when a SyncGroup is received, we'll return
NOT_COORDINATOR. I've changed this to return REBALANCE_IN_PROGRESS since the rebalance state
will have been lost on coordinator failover. This effectively forces the consumer to rejoin
the group, which seems preferable over unnecessarily rediscovering the coordinator.
    3. I added a check for the COORDINATOR_LOAD_IN_PROGRESS handler in SyncGroup. Although
we do not currently return this error, it seems reasonable that we might want to some day,
so it seems better to get the check in now.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/common/requests/SyncGroupResponse.java   |   6 +-
 .../kafka/coordinator/group/GroupCoordinator.scala | 305 ++++++++++-----------
 .../coordinator/group/GroupMetadataManager.scala   |  61 +++--
 .../coordinator/group/GroupCoordinatorTest.scala   |  58 +++-
 4 files changed, 243 insertions(+), 187 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 77f9512..e1e8083 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -47,12 +47,16 @@ public class SyncGroupResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
-     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * COORDINATOR_NOT_AVAILABLE (15)
      * NOT_COORDINATOR (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
      * REBALANCE_IN_PROGRESS (27)
      * GROUP_AUTHORIZATION_FAILED (30)
+     *
+     * NOTE: Currently the coordinator returns REBALANCE_IN_PROGRESS while the coordinator
is
+     * loading. On the next protocol bump, we should consider using COORDINATOR_LOAD_IN_PROGRESS
+     * to be consistent with the other APIs.
      */
 
     private final Errors error;
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 4e605e2..225b709 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -108,17 +108,14 @@ class GroupCoordinator(val brokerId: Int,
                       sessionTimeoutMs: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
-                      responseCallback: JoinCallback) {
-    if (!isActive.get) {
-      responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
-    } else if (!validGroupId(groupId)) {
-      responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR))
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      responseCallback(joinError(memberId, Errors.COORDINATOR_LOAD_IN_PROGRESS))
-    } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
-               sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
+                      responseCallback: JoinCallback): Unit = {
+    validateGroup(groupId).foreach { error =>
+      responseCallback(joinError(memberId, error))
+      return
+    }
+
+    if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
+      sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
       responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
     } else {
       // only try to create the group if the group is not unknown AND
@@ -169,7 +166,8 @@ class GroupCoordinator(val brokerId: Int,
             responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost,
protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost,
protocolType,
+                protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -177,7 +175,8 @@ class GroupCoordinator(val brokerId: Int,
 
           case CompletingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost,
protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost,
protocolType,
+                protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (member.matches(protocols)) {
@@ -204,7 +203,8 @@ class GroupCoordinator(val brokerId: Int,
           case Empty | Stable =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               // if the member id is unknown, register the member to the group
-              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost,
protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost,
protocolType,
+                protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (group.isLeader(memberId) || !member.matches(protocols)) {
@@ -236,16 +236,22 @@ class GroupCoordinator(val brokerId: Int,
                       generation: Int,
                       memberId: String,
                       groupAssignment: Map[String, Array[Byte]],
-                      responseCallback: SyncCallback) {
-    if (!isActive.get) {
-      responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Array.empty, Errors.NOT_COORDINATOR)
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
-        case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment,
responseCallback)
-      }
+                      responseCallback: SyncCallback): Unit = {
+    validateGroup(groupId) match {
+      case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
+        // The coordinator is loading, which means we've lost the state of the active rebalance
and the
+        // group will need to start over at JoinGroup. By returning rebalance in progress,
the consumer
+        // will attempt to rejoin without needing to rediscover the coordinator. Note that
we cannot
+        // return COORDINATOR_LOAD_IN_PROGRESS since older clients do not expect the error.
+        responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
+
+      case Some(error) => responseCallback(Array.empty, error)
+
+      case None =>
+        groupManager.getGroup(groupId) match {
+          case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+          case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment,
responseCallback)
+        }
     }
   }
 
@@ -306,53 +312,45 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors =>
Unit) {
-    if (!isActive.get) {
-      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR)
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      responseCallback(Errors.COORDINATOR_LOAD_IN_PROGRESS)
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None =>
-          // if the group is marked as dead, it means some other thread has just removed
the group
-          // from the coordinator metadata; this is likely that the group has migrated to
some other
-          // coordinator OR the group is in a transient unstable phase. Let the consumer
to retry
-          // joining without specified consumer id,
-          responseCallback(Errors.UNKNOWN_MEMBER_ID)
+  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors =>
Unit): Unit = {
+    validateGroup(groupId).foreach { error =>
+      responseCallback(error)
+      return
+    }
 
-        case Some(group) =>
-          group.inLock {
-            if (group.is(Dead) || !group.has(memberId)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID)
-            } else {
-              val member = group.get(memberId)
-              removeHeartbeatForLeavingMember(group, member)
-              debug(s"Member ${member.memberId} in group ${group.groupId} has left, removing
it from the group")
-              removeMemberAndUpdateGroup(group, member)
-              responseCallback(Errors.NONE)
-            }
+    groupManager.getGroup(groupId) match {
+      case None =>
+        // if the group is marked as dead, it means some other thread has just removed the
group
+        // from the coordinator metadata; this is likely that the group has migrated to some
other
+        // coordinator OR the group is in a transient unstable phase. Let the consumer to
retry
+        // joining without specified consumer id,
+        responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+      case Some(group) =>
+        group.inLock {
+          if (group.is(Dead) || !group.has(memberId)) {
+            responseCallback(Errors.UNKNOWN_MEMBER_ID)
+          } else {
+            val member = group.get(memberId)
+            removeHeartbeatForLeavingMember(group, member)
+            debug(s"Member ${member.memberId} in group ${group.groupId} has left, removing
it from the group")
+            removeMemberAndUpdateGroup(group, member)
+            responseCallback(Errors.NONE)
           }
-      }
+        }
     }
   }
 
   def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = {
-    if (!isActive.get) {
-      groupIds.map(_ -> Errors.COORDINATOR_NOT_AVAILABLE).toMap
-    } else {
-      var groupErrors: Map[String, Errors] = Map()
-      var eligibleGroups: Seq[GroupMetadata] = Seq()
-
-      groupIds.foreach { groupId =>
-        if (!validGroupId(groupId))
-          groupErrors += groupId -> Errors.INVALID_GROUP_ID
-        else if (!isCoordinatorForGroup(groupId))
-          groupErrors += groupId -> Errors.NOT_COORDINATOR
-        else if (isCoordinatorLoadInProgress(groupId))
-          groupErrors += groupId -> Errors.COORDINATOR_LOAD_IN_PROGRESS
-        else {
+    var groupErrors: Map[String, Errors] = Map()
+    var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq()
+
+    groupIds.foreach { groupId =>
+      validateGroup(groupId) match {
+        case Some(error) =>
+          groupErrors += groupId -> error
+
+        case None =>
           groupManager.getGroup(groupId) match {
             case None =>
               groupErrors += groupId ->
@@ -365,85 +363,82 @@ class GroupCoordinator(val brokerId: Int,
                       (if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND
else Errors.NOT_COORDINATOR)
                   case Empty =>
                     group.transitionTo(Dead)
-                    eligibleGroups :+= group
+                    groupsEligibleForDeletion :+= group
                   case _ =>
                     groupErrors += groupId -> Errors.NON_EMPTY_GROUP
                 }
               }
           }
-        }
-      }
-
-      if (eligibleGroups.nonEmpty) {
-        val offsetsRemoved = groupManager.cleanupGroupMetadata(eligibleGroups, group =>
{
-          group.removeAllOffsets()
-        })
-        groupErrors ++= eligibleGroups.map(_.groupId -> Errors.NONE).toMap
-        info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(",
")}. A total of $offsetsRemoved offsets were removed.")
       }
+    }
 
-      groupErrors
+    if (groupsEligibleForDeletion.nonEmpty) {
+      val offsetsRemoved = groupManager.cleanupGroupMetadata(groupsEligibleForDeletion, _.removeAllOffsets())
+      groupErrors ++= groupsEligibleForDeletion.map(_.groupId -> Errors.NONE).toMap
+      info(s"The following groups were deleted: ${groupsEligibleForDeletion.map(_.groupId).mkString(",
")}. " +
+        s"A total of $offsetsRemoved offsets were removed.")
     }
+
+    groupErrors
   }
 
   def handleHeartbeat(groupId: String,
                       memberId: String,
                       generationId: Int,
                       responseCallback: Errors => Unit) {
-    if (!isActive.get) {
-      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR)
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      // the group is still loading, so respond just blindly
-      responseCallback(Errors.NONE)
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None =>
-          responseCallback(Errors.UNKNOWN_MEMBER_ID)
+    validateGroup(groupId).foreach { error =>
+      if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
+        // the group is still loading, so respond just blindly
+        responseCallback(Errors.NONE)
+      else
+        responseCallback(error)
+      return
+    }
 
-        case Some(group) =>
-          group.inLock {
-            group.currentState match {
-              case Dead =>
-                // if the group is marked as dead, it means some other thread has just removed
the group
-                // from the coordinator metadata; this is likely that the group has migrated
to some other
-                // coordinator OR the group is in a transient unstable phase. Let the member
retry
-                // joining without the specified member id,
-                responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-              case Empty =>
-                responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-              case CompletingRebalance =>
-                if (!group.has(memberId))
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
-                else
-                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
-
-              case PreparingRebalance =>
-                if (!group.has(memberId)) {
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
-                } else if (generationId != group.generationId) {
-                  responseCallback(Errors.ILLEGAL_GENERATION)
-                } else {
-                  val member = group.get(memberId)
-                  completeAndScheduleNextHeartbeatExpiration(group, member)
-                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
-                }
+    groupManager.getGroup(groupId) match {
+      case None =>
+        responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
-              case Stable =>
-                if (!group.has(memberId)) {
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
-                } else if (generationId != group.generationId) {
-                  responseCallback(Errors.ILLEGAL_GENERATION)
-                } else {
-                  val member = group.get(memberId)
-                  completeAndScheduleNextHeartbeatExpiration(group, member)
-                  responseCallback(Errors.NONE)
-                }
+      case Some(group) => group.inLock {
+        group.currentState match {
+          case Dead =>
+            // if the group is marked as dead, it means some other thread has just removed
the group
+            // from the coordinator metadata; this is likely that the group has migrated
to some other
+            // coordinator OR the group is in a transient unstable phase. Let the member
retry
+            // joining without the specified member id,
+            responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+          case Empty =>
+            responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+          case CompletingRebalance =>
+            if (!group.has(memberId))
+              responseCallback(Errors.UNKNOWN_MEMBER_ID)
+            else
+              responseCallback(Errors.REBALANCE_IN_PROGRESS)
+
+          case PreparingRebalance =>
+            if (!group.has(memberId)) {
+              responseCallback(Errors.UNKNOWN_MEMBER_ID)
+            } else if (generationId != group.generationId) {
+              responseCallback(Errors.ILLEGAL_GENERATION)
+            } else {
+              val member = group.get(memberId)
+              completeAndScheduleNextHeartbeatExpiration(group, member)
+              responseCallback(Errors.REBALANCE_IN_PROGRESS)
             }
-          }
+
+          case Stable =>
+            if (!group.has(memberId)) {
+              responseCallback(Errors.UNKNOWN_MEMBER_ID)
+            } else if (generationId != group.generationId) {
+              responseCallback(Errors.ILLEGAL_GENERATION)
+            } else {
+              val member = group.get(memberId)
+              completeAndScheduleNextHeartbeatExpiration(group, member)
+              responseCallback(Errors.NONE)
+            }
+        }
       }
     }
   }
@@ -526,19 +521,15 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleFetchOffsets(groupId: String,
-                         partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition,
OffsetFetchResponse.PartitionData]) = {
-    if (!isActive.get)
-      (Errors.COORDINATOR_NOT_AVAILABLE, Map())
-    else if (!isCoordinatorForGroup(groupId)) {
-      debug(s"Could not fetch offsets for group $groupId (not group coordinator)")
-      (Errors.NOT_COORDINATOR, Map())
-    } else if (isCoordinatorLoadInProgress(groupId))
-      (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
-    else {
-      // return offsets blindly regardless the current group state since the group may be
using
-      // Kafka commit storage without automatic group management
-      (Errors.NONE, groupManager.getOffsets(groupId, partitions))
+  def handleFetchOffsets(groupId: String, partitions: Option[Seq[TopicPartition]] = None):
+  (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
+
+    validateGroup(groupId) match {
+      case Some(error) => error -> Map.empty
+      case None =>
+        // return offsets blindly regardless the current group state since the group may
be using
+        // Kafka commit storage without automatic group management
+        (Errors.NONE, groupManager.getOffsets(groupId, partitions))
     }
   }
 
@@ -552,20 +543,16 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
-    if (!isActive.get) {
-      (Errors.COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      (Errors.COORDINATOR_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None => (Errors.NONE, GroupCoordinator.DeadGroup)
-        case Some(group) =>
-          group.inLock {
-            (Errors.NONE, group.summary)
-          }
-      }
+    validateGroup(groupId) match {
+      case Some(error) => (error, GroupCoordinator.EmptyGroup)
+      case None =>
+        groupManager.getGroup(groupId) match {
+          case None => (Errors.NONE, GroupCoordinator.DeadGroup)
+          case Some(group) =>
+            group.inLock {
+              (Errors.NONE, group.summary)
+            }
+        }
     }
   }
 
@@ -577,12 +564,14 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def validateGroup(groupId: String): Option[Errors] = {
-    if (!isActive.get)
+    if (!validGroupId(groupId))
+      Some(Errors.INVALID_GROUP_ID)
+    else if (!isActive.get)
       Some(Errors.COORDINATOR_NOT_AVAILABLE)
-    else if (!isCoordinatorForGroup(groupId))
-      Some(Errors.NOT_COORDINATOR)
     else if (isCoordinatorLoadInProgress(groupId))
       Some(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+    else if (!isCoordinatorForGroup(groupId))
+      Some(Errors.NOT_COORDINATOR)
     else
       None
   }
@@ -625,7 +614,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleGroupImmigration(offsetTopicPartitionId: Int) {
-    groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
+    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
   }
 
   def handleGroupEmigration(offsetTopicPartitionId: Int) {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 63af1cb..81ce8d5 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -156,7 +156,7 @@ class GroupMetadataManager(brokerId: Int,
 
   def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId))
 
-  def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
+  def isLoading: Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
 
   // return true iff group is owned and the group doesn't exist
   def groupNotExists(groupId: String) = inLock(partitionLock) {
@@ -482,37 +482,32 @@ class GroupMetadataManager(brokerId: Int,
   /**
    * Asynchronously read the partition from the offsets topic and populate the cache
    */
-  def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit)
{
+  def scheduleLoadGroupAndOffsets(offsetsPartition: Int, onGroupLoaded: GroupMetadata =>
Unit) {
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
-    info(s"Scheduling loading of offsets and group metadata from $topicPartition")
-    scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
+    if (addLoadingPartition(offsetsPartition)) {
+      info(s"Scheduling loading of offsets and group metadata from $topicPartition")
+      scheduler.schedule(topicPartition.toString, () => loadGroupsAndOffsets(topicPartition,
onGroupLoaded))
+    } else {
+      info(s"Already loading offsets and group metadata from $topicPartition")
+    }
+  }
 
-    def doLoadGroupsAndOffsets() {
+  private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded:
GroupMetadata => Unit) {
+    try {
+      val startMs = time.milliseconds()
+      doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
+      info(s"Finished loading offsets and group metadata from $topicPartition in ${time.milliseconds()
- startMs} milliseconds.")
+    } catch {
+      case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
+    } finally {
       inLock(partitionLock) {
-        if (loadingPartitions.contains(offsetsPartition)) {
-          info(s"Offset load from $topicPartition already in progress.")
-          return
-        } else {
-          loadingPartitions.add(offsetsPartition)
-        }
-      }
-
-      try {
-        val startMs = time.milliseconds()
-        loadGroupsAndOffsets(topicPartition, onGroupLoaded)
-        info(s"Finished loading offsets and group metadata from $topicPartition in ${time.milliseconds()
- startMs} milliseconds.")
-      } catch {
-        case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
-      } finally {
-        inLock(partitionLock) {
-          ownedPartitions.add(offsetsPartition)
-          loadingPartitions.remove(offsetsPartition)
-        }
+        ownedPartitions.add(topicPartition.partition)
+        loadingPartitions.remove(topicPartition.partition)
       }
     }
   }
 
-  private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded:
GroupMetadata => Unit) {
+  private def doLoadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata
=> Unit) {
     def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
 
     replicaManager.getLog(topicPartition) match {
@@ -650,7 +645,6 @@ class GroupMetadataManager(brokerId: Int,
             throw new IllegalStateException(s"Unexpected unload of active group $groupId
while " +
               s"loading partition $topicPartition")
         }
-
     }
   }
 
@@ -876,11 +870,24 @@ class GroupMetadataManager(brokerId: Int,
    *
    * NOTE: this is for test only
    */
-  def addPartitionOwnership(partition: Int) {
+  private[group] def addPartitionOwnership(partition: Int) {
     inLock(partitionLock) {
       ownedPartitions.add(partition)
     }
   }
+
+  /**
+   * Add a partition to the loading partitions set. Return true if the partition was not
+   * already loading.
+   *
+   * Visible for testing
+   */
+  private[group] def addLoadingPartition(partition: Int): Boolean = {
+    inLock(partitionLock) {
+      loadingPartitions.add(partition)
+    }
+  }
+
 }
 
 /**
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 1598547..08c13eb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -100,7 +100,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId,
reaperEnabled = false)
 
     groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory,
joinPurgatory, timer.time)
-    groupCoordinator.startup(false)
+    groupCoordinator.startup(enableMetadataExpiration = false)
 
     // add the partition into the owned partition list
     groupPartitionId = groupCoordinator.partitionFor(groupId)
@@ -115,6 +115,62 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testRequestHandlingWhileLoadingInProgress(): Unit = {
+    val otherGroupPartitionId = groupCoordinator.groupManager.partitionFor(otherGroupId)
+    assertTrue(otherGroupPartitionId != groupPartitionId)
+
+    groupCoordinator.groupManager.addLoadingPartition(otherGroupPartitionId)
+    assertTrue(groupCoordinator.groupManager.isGroupLoading(otherGroupId))
+
+    // JoinGroup
+    var joinGroupResponse: Option[JoinGroupResult] = None
+    groupCoordinator.handleJoinGroup(otherGroupId, memberId, "clientId", "clientHost", 60000,
10000, "consumer",
+      List("range" -> new Array[Byte](0)), result => { joinGroupResponse = Some(result)})
+    assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), joinGroupResponse.map(_.error))
+
+    // SyncGroup
+    var syncGroupResponse: Option[Errors] = None
+    groupCoordinator.handleSyncGroup(otherGroupId, 1, memberId, Map.empty[String, Array[Byte]],
+      (_, error)=> syncGroupResponse = Some(error))
+    assertEquals(Some(Errors.REBALANCE_IN_PROGRESS), syncGroupResponse)
+
+    // OffsetCommit
+    val topicPartition = new TopicPartition("foo", 0)
+    var offsetCommitErrors = Map.empty[TopicPartition, Errors]
+    groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
+      immutable.Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors
= result })
+    assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition))
+
+    // Heartbeat
+    var heartbeatError: Option[Errors] = None
+    groupCoordinator.handleHeartbeat(otherGroupId, memberId, 1, error => { heartbeatError
= Some(error) })
+    assertEquals(Some(Errors.NONE), heartbeatError)
+
+    // DescribeGroups
+    val (describeGroupError, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
+    assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupError)
+
+    // ListGroups
+    val (listGroupsError, _) = groupCoordinator.handleListGroups()
+    assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError)
+
+    // DeleteGroups
+    val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(immutable.Set(otherGroupId))
+    assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), deleteGroupsErrors.get(otherGroupId))
+
+    // Check that non-loading groups are still accessible
+    assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(groupId)._1)
+
+    // After loading, we should be able to access the group
+    val otherGroupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
otherGroupPartitionId)
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getLog(otherGroupMetadataTopicPartition)).andReturn(None)
+    EasyMock.replay(replicaManager)
+    groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition,
group => {})
+    assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(otherGroupId)._1)
+  }
+
+  @Test
   def testOffsetsRetentionMsIntegerOverflow() {
     val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
     props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message