kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4399; Fix deadlock between cleanupGroupMetadata and offset commit
Date Fri, 02 Dec 2016 04:17:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 9ffadbfdc -> 0e20e5fbf


KAFKA-4399; Fix deadlock between cleanupGroupMetadata and offset commit

Author: Alexey Ozeritsky <aozeritsky@yandex-team.ru>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #2125 from resetius/KAFKA-4399

(cherry picked from commit ea370be518a783f3a5d8d834f78c82e36bf968b3)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e20e5fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e20e5fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e20e5fb

Branch: refs/heads/0.10.1
Commit: 0e20e5fbfe683257882f0426357b6c2e75bcacca
Parents: 9ffadbf
Author: Alexey Ozeritsky <aozeritsky@yandex-team.ru>
Authored: Thu Dec 1 19:09:06 2016 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Dec 1 19:57:12 2016 -0800

----------------------------------------------------------------------
 .../coordinator/GroupMetadataManager.scala      | 106 +++++++++----------
 1 file changed, 48 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0e20e5fb/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index feedc45..fafc39c 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -136,43 +136,6 @@ class GroupMetadataManager(val brokerId: Int,
     }
   }
 
-  /**
-   * Remove the group from the cache and delete all metadata associated with it. This should
be
-   * called only after all offsets for the group have expired and no members are remaining
(i.e.
-   * it is in the Empty state).
-   */
-  private def evictGroupAndDeleteMetadata(group: GroupMetadata) {
-    // guard this removal in case of concurrent access (e.g. if a delayed join completes
with no members
-    // while the group is being removed due to coordinator emigration). We also avoid writing
the tombstone
-    // when the generationId is 0, since this group is only using Kafka for offset storage.
-    if (groupMetadataCache.remove(group.groupId, group) && group.generationId >
0) {
-      // Append the tombstone messages to the partition. It is okay if the replicas don't
receive these (say,
-      // if we crash or leaders move) since the new leaders will still expire the consumers
with heartbeat and
-      // retry removing this group.
-      val groupPartition = partitionFor(group.groupId)
-      getMessageFormatVersionAndTimestamp(groupPartition).foreach { case (magicValue, timestamp)
=>
-        val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
-          timestamp = timestamp, magicValue = magicValue)
-
-        val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
-        partitionOpt.foreach { partition =>
-          val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
-
-          trace("Marking group %s as deleted.".format(group.groupId))
-
-          try {
-            // do not need to require acks since even if the tombstone is lost,
-            // it will be appended again by the new leader
-            partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
tombstone))
-          } catch {
-            case t: Throwable =>
-              error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition),
t)
-            // ignore and continue
-          }
-        }
-      }
-    }
-  }
 
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
@@ -598,7 +561,7 @@ class GroupMetadataManager(val brokerId: Int,
     val startMs = time.milliseconds()
     var offsetsRemoved = 0
 
-    groupMetadataCache.foreach { case (groupId, group) =>
+    val result = groupMetadataCache.flatMap { case (groupId, group) =>
       group synchronized {
         if (!group.is(Dead)) {
           val offsetsPartition = partitionFor(groupId)
@@ -612,32 +575,59 @@ class GroupMetadataManager(val brokerId: Int,
                 new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue
= magicValue)
               }.toBuffer
 
-              val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName,
offsetsPartition)
-              partitionOpt.foreach { partition =>
-                val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-                trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
-
-                try {
-                  // do not need to require acks since even if the tombstone is lost,
-                  // it will be appended again in the next purge cycle
-                  partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
tombstones: _*))
-                  offsetsRemoved += tombstones.size
-                }
-                catch {
-                  case t: Throwable =>
-                    error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size,
appendPartition), t)
-                  // ignore and continue
-                }
-              }
+              val numOffsetsExpired = tombstones.size
 
               if (group.is(Empty) && !group.hasOffsets) {
                 group.transitionTo(Dead)
-                evictGroupAndDeleteMetadata(group)
-                info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+
+                // We avoid writing the tombstone
+                // when the generationId is 0, since this group is only using Kafka for offset
storage.
+                if (groupMetadataCache.get(groupId) == group && group.generationId
> 0) {
+                  // Append the tombstone messages to the partition. It is okay if the replicas
don't receive these (say,
+                  // if we crash or leaders move) since the new leaders will still expire
the consumers with heartbeat and
+                  // retry removing this group.
+
+                  trace("Marking group %s as deleted.".format(groupId))
+
+                  tombstones += new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(groupId),
+                    timestamp = timestamp, magicValue = magicValue)
+                }
               }
 
+              Some((group, offsetsPartition, tombstones, numOffsetsExpired))
             case None =>
-              info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup
for other alive groups".format(brokerId, group.groupId))
+              info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup
for other alive groups".format(brokerId, groupId))
+              None
+          }
+        } else {
+          None
+        }
+      }
+    }
+
+    for ((group, offsetsPartition, tombstones, numOffsetsExpired) <- result) {
+      val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+
+      partitionOpt.foreach { partition =>
+        val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+        trace("Marked %d offsets in %s for deletion.".format(numOffsetsExpired, appendPartition))
+
+        try {
+          // do not need to require acks since even if the tombstone is lost,
+          // it will be appended again in the next purge cycle
+          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
tombstones: _*))
+          offsetsRemoved += numOffsetsExpired
+        }
+        catch {
+          case t: Throwable =>
+            error(s"Failed to write ${tombstones.size} tombstones for group ${group.groupId}
to $appendPartition.", t)
+          // ignore and continue
+        }
+
+        group synchronized {
+          if (group.is(Dead)) {
+            groupMetadataCache.remove(group.groupId, group)
+            info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
           }
         }
       }


Mime
View raw message