kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [2/3] kafka git commit: KAFKA-2163; The offsets manager's stale-offset-cleanup and offset load should be mutually exclusive; reviewed by Jun Rao
Date Tue, 12 May 2015 22:41:31 GMT
KAFKA-2163; The offsets manager's stale-offset-cleanup and offset load should be mutually exclusive;
reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d5e0f07
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d5e0f07
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d5e0f07

Branch: refs/heads/trunk
Commit: 2d5e0f07613fba67a0e22aaeda61789588e6a73b
Parents: 9b57fe4
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Tue May 12 13:44:55 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue May 12 13:44:55 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |   2 +-
 .../main/scala/kafka/server/OffsetManager.scala | 151 ++++++++++---------
 2 files changed, 80 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2d5e0f07/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 122b1db..730a232 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -219,7 +219,7 @@ class Partition(val topic: String,
         if (topic == OffsetManager.OffsetsTopicName &&
            /* if we are making a leader->follower transition */
            leaderReplica == localBrokerId)
-          offsetManager.clearOffsetsInPartition(partitionId)
+          offsetManager.removeOffsetsFromCacheForPartition(partitionId)
       }
 
       if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId)
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d5e0f07/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 18680ce..df919f7 100755
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -48,7 +48,7 @@ import org.I0Itec.zkclient.ZkClient
  * @param maxMetadataSize The maximum allowed metadata for any offset commit.
  * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets
into the cache.
  * @param offsetsRetentionMs Offsets older than this retention period will be discarded.
- * @param offsetsRetentionCheckIntervalMs Frequency at which to check for stale offsets.
+ * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets.
  * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic
(should not change after deployment).
  * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively
small to facilitate faster
  *                                 log compaction and faster offset loads
@@ -93,15 +93,14 @@ class OffsetManager(val config: OffsetManagerConfig,
   /* offsets and metadata cache */
   private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
   private val followerTransitionLock = new Object
-
   private val loadingPartitions: mutable.Set[Int] = mutable.Set()
-
+  private val cleanupOrLoadMutex = new Object
   private val shuttingDown = new AtomicBoolean(false)
 
   this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: "
 
-  scheduler.schedule(name = "offsets-cache-compactor",
-                     fun = compact,
+  scheduler.schedule(name = "delete-expired-consumer-offsets",
+                     fun = deleteExpiredOffsets,
                      period = config.offsetsRetentionCheckIntervalMs,
                      unit = TimeUnit.MILLISECONDS)
 
@@ -117,55 +116,57 @@ class OffsetManager(val config: OffsetManagerConfig,
     }
   )
 
-  private def compact() {
-    debug("Compacting offsets cache.")
+  private def deleteExpiredOffsets() {
+    debug("Collecting expired offsets.")
     val startMs = SystemTime.milliseconds
 
-    val staleOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata)
=>
-      offsetAndMetadata.expireTimestamp < startMs
-    }
+    val numExpiredOffsetsRemoved = cleanupOrLoadMutex synchronized {
+      val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata)
=>
+        offsetAndMetadata.expireTimestamp < startMs
+      }
 
-    debug("Found %d expired offsets.".format(staleOffsets.size))
+      debug("Found %d expired offsets.".format(expiredOffsets.size))
 
-    // delete the stale offsets from the table and generate tombstone messages to remove
them from the log
-    val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata)
=>
-      val offsetsPartition = partitionFor(groupTopicAndPartition.group)
-      trace("Removing stale offset and metadata for %s: %s".format(groupTopicAndPartition,
offsetAndMetadata))
+      // delete the expired offsets from the table and generate tombstone messages to remove
them from the log
+      val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata)
=>
+        val offsetsPartition = partitionFor(groupTopicAndPartition.group)
+        trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition,
offsetAndMetadata))
 
-      offsetsCache.remove(groupTopicAndPartition)
+        offsetsCache.remove(groupTopicAndPartition)
 
-      val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,
-        groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
+        val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,
+          groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
 
-      (offsetsPartition, new Message(bytes = null, key = commitKey))
-    }.groupBy{ case (partition, tombstone) => partition }
+        (offsetsPartition, new Message(bytes = null, key = commitKey))
+      }.groupBy { case (partition, tombstone) => partition }
 
-    // Append the tombstone messages to the offset partitions. It is okay if the replicas
don't receive these (say,
-    // if we crash or leaders move) since the new leaders will get rid of stale offsets during
their own purge cycles.
-    val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones)
=>
-      val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
-      partitionOpt.map { partition =>
-        val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
-        val messages = tombstones.map(_._2).toSeq
+      // Append the tombstone messages to the offset partitions. It is okay if the replicas
don't receive these (say,
+      // if we crash or leaders move) since the new leaders will get rid of expired offsets
during their own purge cycles.
+      tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
+        val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+        partitionOpt.map { partition =>
+          val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+          val messages = tombstones.map(_._2).toSeq
 
-        trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
+          trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
 
-        try {
-          // do not need to require acks since even if the tombsone is lost,
-          // it will be appended again in the next purge cycle
-          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
messages:_*))
-          tombstones.size
-        }
-        catch {
-          case t: Throwable =>
-            error("Failed to mark %d stale offsets for deletion in %s.".format(messages.size,
appendPartition), t)
-            // ignore and continue
-            0
+          try {
+            // do not need to require acks since even if the tombsone is lost,
+            // it will be appended again in the next purge cycle
+            partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
messages: _*))
+            tombstones.size
+          }
+          catch {
+            case t: Throwable =>
+              error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size,
appendPartition), t)
+              // ignore and continue
+              0
+          }
         }
-      }
-    }.sum
+      }.sum
+    }
 
-    debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds
- startMs))
+    info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved,
SystemTime.milliseconds - startMs))
   }
 
 
@@ -369,34 +370,36 @@ class OffsetManager(val config: OffsetManagerConfig,
             var currOffset = log.logSegments.head.baseOffset
             val buffer = ByteBuffer.allocate(config.loadBufferSize)
             // loop breaks if leader changes at any time during the load, since getHighWatermark
is -1
-            while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get())
{
-              buffer.clear()
-              val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
-              messages.readInto(buffer, 0)
-              val messageSet = new ByteBufferMessageSet(buffer)
-              messageSet.foreach { msgAndOffset =>
-                require(msgAndOffset.message.key != null, "Offset entry key should not be
null")
-                val key = OffsetManager.readMessageKey(msgAndOffset.message.key)
-                if (msgAndOffset.message.payload == null) {
-                  if (offsetsCache.remove(key) != null)
-                    trace("Removed offset for %s due to tombstone entry.".format(key))
-                  else
-                    trace("Ignoring redundant tombstone for %s.".format(key))
-                } else {
-                  // special handling for version 0:
-                  // set the expiration time stamp as commit time stamp + server default
retention time
-                  val value = OffsetManager.readMessageValue(msgAndOffset.message.payload)
-                  putOffset(key, value.copy (
-                    expireTimestamp = {
-                      if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
-                        value.commitTimestamp + config.offsetsRetentionMs
-                      else
-                        value.expireTimestamp
-                    }
-                  ))
-                  trace("Loaded offset %s for %s.".format(value, key))
+            cleanupOrLoadMutex synchronized {
+              while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get())
{
+                buffer.clear()
+                val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
+                messages.readInto(buffer, 0)
+                val messageSet = new ByteBufferMessageSet(buffer)
+                messageSet.foreach { msgAndOffset =>
+                  require(msgAndOffset.message.key != null, "Offset entry key should not
be null")
+                  val key = OffsetManager.readMessageKey(msgAndOffset.message.key)
+                  if (msgAndOffset.message.payload == null) {
+                    if (offsetsCache.remove(key) != null)
+                      trace("Removed offset for %s due to tombstone entry.".format(key))
+                    else
+                      trace("Ignoring redundant tombstone for %s.".format(key))
+                  } else {
+                    // special handling for version 0:
+                    // set the expiration time stamp as commit time stamp + server default
retention time
+                    val value = OffsetManager.readMessageValue(msgAndOffset.message.payload)
+                    putOffset(key, value.copy (
+                      expireTimestamp = {
+                        if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+                          value.commitTimestamp + config.offsetsRetentionMs
+                        else
+                          value.expireTimestamp
+                      }
+                    ))
+                    trace("Loaded offset %s for %s.".format(value, key))
+                  }
+                  currOffset = msgAndOffset.nextOffset
                 }
-                currOffset = msgAndOffset.nextOffset
               }
             }
 
@@ -434,16 +437,20 @@ class OffsetManager(val config: OffsetManagerConfig,
    * that partition.
    * @param offsetsPartition Groups belonging to this partition of the offsets topic will
be deleted from the cache.
    */
-  def clearOffsetsInPartition(offsetsPartition: Int) {
-    debug("Deleting offset entries belonging to [%s,%d].".format(OffsetManager.OffsetsTopicName,
offsetsPartition))
-
+  def removeOffsetsFromCacheForPartition(offsetsPartition: Int) {
+    var numRemoved = 0
     followerTransitionLock synchronized {
       offsetsCache.keys.foreach { key =>
         if (partitionFor(key.group) == offsetsPartition) {
           offsetsCache.remove(key)
+          numRemoved += 1
         }
       }
     }
+
+    if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
+                             .format(numRemoved, TopicAndPartition(OffsetManager.OffsetsTopicName,
offsetsPartition)))
+
   }
 
   def shutdown() {


Mime
View raw message