kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Improvements in group metadata cleanup and test coverage
Date Fri, 02 Dec 2016 19:02:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 93804d50f -> b06fc322b


MINOR: Improvements in group metadata cleanup and test coverage

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #2202 from hachikuji/group-expiration-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b06fc322
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b06fc322
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b06fc322

Branch: refs/heads/trunk
Commit: b06fc322bf008f55bb8b14c5b02c9af5a90a92ed
Parents: 93804d5
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Dec 2 11:02:04 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Dec 2 11:02:04 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |   2 +-
 .../coordinator/GroupMetadataManager.scala      | 116 ++++++++-----------
 .../coordinator/GroupMetadataManagerTest.scala  |  67 ++++++++++-
 3 files changed, 113 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b06fc322/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c7d4044..c8db015 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -443,7 +443,7 @@ class Partition(val topic: String,
 
           val info = log.append(messages, assignOffsets = true)
           // probably unblock some follower fetch requests since log end offset has been
updated
-          replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic,
this.partitionId))
+          replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
           (info, maybeIncrementLeaderHW(leaderReplica))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b06fc322/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index b45f25b..e55bcaa 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -50,7 +50,6 @@ import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
 import kafka.utils.CoreUtils.inLock
 
-
 class GroupMetadataManager(val brokerId: Int,
                            val interBrokerProtocolVersion: ApiVersion,
                            val config: OffsetConfig,
@@ -561,79 +560,62 @@ class GroupMetadataManager(val brokerId: Int,
     val startMs = time.milliseconds()
     var offsetsRemoved = 0
 
-    val result = groupMetadataCache.flatMap { case (groupId, group) =>
-      group synchronized {
-        if (!group.is(Dead)) {
-          val offsetsPartition = partitionFor(groupId)
-          val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(offsetsPartition)
-          magicValueAndTimestampOpt match {
-            case Some((magicValue, timestamp)) =>
-              // delete the expired offsets from the table and generate tombstone messages
to remove them from the log
-              val tombstones = group.removeExpiredOffsets(startMs).map { case (topicPartition,
offsetAndMetadata) =>
-                trace("Removing expired offset and metadata for %s, %s: %s".format(groupId,
topicPartition, offsetAndMetadata))
-                val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic,
topicPartition.partition)
-                new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue
= magicValue)
-              }.toBuffer
-
-              val numOffsetsExpired = tombstones.size
-
-              if (group.is(Empty) && !group.hasOffsets) {
-                group.transitionTo(Dead)
-
-                // We avoid writing the tombstone
-                // when the generationId is 0, since this group is only using Kafka for offset
storage.
-                if (groupMetadataCache.get(groupId) == group && group.generationId
> 0) {
-                  // Append the tombstone messages to the partition. It is okay if the replicas
don't receive these (say,
-                  // if we crash or leaders move) since the new leaders will still expire
the consumers with heartbeat and
-                  // retry removing this group.
-
-                  trace("Marking group %s as deleted.".format(groupId))
-
-                  tombstones += new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(groupId),
-                    timestamp = timestamp, magicValue = magicValue)
-                }
-              }
-
-              Some((group, offsetsPartition, tombstones, numOffsetsExpired))
-            case None =>
-              info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup
for other alive groups".format(brokerId, groupId))
-              None
-          }
-        } else {
-          None
+    groupMetadataCache.foreach { case (groupId, group) =>
+      val (expiredOffsets, groupIsDead, generation) = group synchronized {
+        // remove expired offsets from the cache
+        val expiredOffsets = group.removeExpiredOffsets(startMs)
+        if (group.is(Empty) && !group.hasOffsets) {
+          info(s"Group $groupId transitioned to Dead in generation ${group.generationId}")
+          group.transitionTo(Dead)
         }
+        (expiredOffsets, group.is(Dead), group.generationId)
       }
-    }
-
-    for ((group, offsetsPartition, tombstones, numOffsetsExpired) <- result) {
-      val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-
-      partitionOpt.foreach { partition =>
-        val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-        trace("Marked %d offsets in %s for deletion.".format(numOffsetsExpired, appendPartition))
 
-        try {
-          // do not need to require acks since even if the tombstone is lost,
-          // it will be appended again in the next purge cycle
-          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
tombstones: _*))
-          offsetsRemoved += numOffsetsExpired
-        }
-        catch {
-          case t: Throwable =>
-            error(s"Failed to write ${tombstones.size} tombstones for group ${group.groupId}
to $appendPartition.", t)
-          // ignore and continue
-        }
+      val offsetsPartition = partitionFor(groupId)
+      getMessageFormatVersionAndTimestamp(offsetsPartition) match {
+        case Some((magicValue, timestamp)) =>
+          val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+          partitionOpt.foreach { partition =>
+            val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+            val tombstones = expiredOffsets.map { case (topicPartition, offsetAndMetadata)
=>
+              trace(s"Removing expired offset and metadata for $groupId, $topicPartition:
$offsetAndMetadata")
+              val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic,
topicPartition.partition)
+              new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue
= magicValue)
+            }.toBuffer
+            trace(s"Marked ${expiredOffsets.size} offsets in $appendPartition for deletion.")
+
+            // We avoid writing the tombstone when the generationId is 0, since this group
is only using
+            // Kafka for offset storage.
+            if (groupIsDead && groupMetadataCache.remove(groupId, group) &&
generation > 0) {
+              // Append the tombstone messages to the partition. It is okay if the replicas
don't receive these (say,
+              // if we crash or leaders move) since the new leaders will still expire the
consumers with heartbeat and
+              // retry removing this group.
+              tombstones += new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
+                timestamp = timestamp, magicValue = magicValue)
+              trace(s"Group $groupId removed from the metadata cache and marked for deletion
in $appendPartition.")
+            }
 
-        group synchronized {
-          if (group.is(Dead)) {
-            groupMetadataCache.remove(group.groupId, group)
-            info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+            if (tombstones.nonEmpty) {
+              try {
+                // do not need to require acks since even if the tombstone is lost,
+                // it will be appended again in the next purge cycle
+                partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
tombstones: _*))
+                offsetsRemoved += expiredOffsets.size
+                trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition
for expired offsets and/or metadata for group $groupId")
+              } catch {
+                case t: Throwable =>
+                  error(s"Failed to append ${tombstones.size} tombstones to $appendPartition
for expired offsets and/or metadata for group $groupId.", t)
+                // ignore and continue
+              }
+            }
           }
-        }
+
+        case None =>
+          info(s"BrokerId $brokerId is no longer a coordinator for the group $groupId. Proceeding
cleanup for other alive groups")
       }
     }
 
-    info("Removed %d expired offsets in %d milliseconds.".format(offsetsRemoved, time.milliseconds()
- startMs))
+    info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
@@ -682,7 +664,7 @@ class GroupMetadataManager(val brokerId: Int,
    * @return  Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
    */
   private def getMessageFormatVersionAndTimestamp(partition: Int): Option[(Byte, Long)] =
{
-    val groupMetadataTopicAndPartition = new TopicAndPartition(Topic.GroupMetadataTopicName,
partition)
+    val groupMetadataTopicAndPartition = TopicAndPartition(Topic.GroupMetadataTopicName,
partition)
     replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).map { messageFormatVersion
=>
       val timestamp = {
         if (messageFormatVersion == Message.MagicValue_V0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b06fc322/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 2a81d31..6c03476 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -197,7 +197,7 @@ class GroupMetadataManagerTest {
       maybeError = Some(error)
     }
 
-    val delayedStoreOpt = groupMetadataManager.prepareStoreGroup(group, Map(memberId ->
Array[Byte]()), callback)
+    groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
     assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP), maybeError)
     EasyMock.verify(replicaManager)
   }
@@ -379,7 +379,52 @@ class GroupMetadataManagerTest {
   }
 
   @Test
-  def testExpireGroup() {
+  def testGroupMetadataRemoval() {
+    val topicPartition1 = new TopicPartition("foo", 0)
+    val topicPartition2 = new TopicPartition("foo", 1)
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+    group.generationId = 5
+
+    // expect the group metadata tombstone
+    EasyMock.reset(partition)
+    val messageSetCapture: Capture[ByteBufferMessageSet] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1))
+    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(partition.appendMessagesToLeader(EasyMock.capture(messageSetCapture),
EasyMock.anyInt()))
+      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+    EasyMock.replay(replicaManager, partition)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    assertTrue(messageSetCapture.hasCaptured)
+
+    val messageSet = messageSetCapture.getValue
+    assertEquals(1, messageSet.size)
+
+    val metadataTombstone = messageSet.head.message
+    assertTrue(metadataTombstone.hasKey)
+    assertTrue(metadataTombstone.isNull)
+
+    val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
+    assertEquals(groupId, groupKey.key)
+
+    // the full group should be gone since all offsets were removed
+    assertEquals(None, groupMetadataManager.getGroup(groupId))
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+  }
+
+  @Test
+  def testExpireGroupWithOffsetsOnly() {
+    // verify that the group is removed properly, but no tombstone is written if
+    // this is a group which is only using kafka for offset storage
+
     val memberId = ""
     val generationId = -1
     val topicPartition1 = new TopicPartition("foo", 0)
@@ -418,12 +463,27 @@ class GroupMetadataManagerTest {
 
     // expect the offset tombstone
     EasyMock.reset(partition)
-    EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]),
EasyMock.anyInt()))
+    val messageSetCapture: Capture[ByteBufferMessageSet] = EasyMock.newCapture()
+
+    EasyMock.expect(partition.appendMessagesToLeader(EasyMock.capture(messageSetCapture),
EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
     groupMetadataManager.cleanupGroupMetadata()
 
+    assertTrue(messageSetCapture.hasCaptured)
+
+    // verify the tombstones are correct and only for the expired offsets
+    val messageSet = messageSetCapture.getValue
+    assertEquals(2, messageSet.size)
+    messageSet.map(_.message).foreach { message =>
+      assertTrue(message.hasKey)
+      assertTrue(message.isNull)
+      val offsetKey = GroupMetadataManager.readMessageKey(message.key).asInstanceOf[OffsetKey]
+      assertEquals(groupId, offsetKey.key.group)
+      assertEquals("foo", offsetKey.key.topicPartition.topic)
+    }
+
     // the full group should be gone since all offsets were removed
     assertEquals(None, groupMetadataManager.getGroup(groupId))
     val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2))
@@ -510,5 +570,4 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1))
   }
 
-
 }


Mime
View raw message