kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6622; Fix performance issue loading consumer offsets (#4661)
Date Fri, 09 Mar 2018 22:21:09 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5760da7  KAFKA-6622; Fix performance issue loading consumer offsets (#4661)
5760da7 is described below

commit 5760da7d0be8d9326e82f187015d5c0169bfb40b
Author: Radai Rosenblatt <radai.rosenblatt@gmail.com>
AuthorDate: Fri Mar 9 14:21:06 2018 -0800

    KAFKA-6622; Fix performance issue loading consumer offsets (#4661)
    
    `batch.baseOffset` is an expensive operation (even says so in its javadoc), and yet was
called for every single record in a batch when loading offsets. This means that for N records
in a gzipped batch, the entire batch will be unzipped N times. The fix is to compute and cache
the base offset once as we decompress and process the batch.
    
    Reviewers: Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Jason Gustafson <jason@confluent.io>
---
 .../main/scala/kafka/coordinator/group/GroupMetadataManager.scala  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3b79544..63af1cb 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -555,8 +555,11 @@ class GroupMetadataManager(brokerId: Int,
               }
               pendingOffsets.remove(batch.producerId)
             } else {
+              var batchBaseOffset: Option[Long] = None
               for (record <- batch.asScala) {
                 require(record.hasKey, "Group metadata/offset entry key should not be null")
+                if (batchBaseOffset.isEmpty)
+                  batchBaseOffset = Some(record.offset)
                 GroupMetadataManager.readMessageKey(record.key) match {
 
                   case offsetKey: OffsetKey =>
@@ -573,9 +576,9 @@ class GroupMetadataManager(brokerId: Int,
                     } else {
                       val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
                       if (isTxnOffsetCommit)
-                        pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset),
offsetAndMetadata))
+                        pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset,
offsetAndMetadata))
                       else
-                        loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset),
offsetAndMetadata))
+                        loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset,
offsetAndMetadata))
                     }
 
                   case groupMetadataKey: GroupMetadataKey =>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message