kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3810: replication of internal topics should not be limited by replica.fetch.max.bytes
Date Sun, 19 Jun 2016 18:42:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7fd4fe448 -> 40fb90937

KAFKA-3810: replication of internal topics should not be limited by replica.fetch.max.bytes

>From the kafka-dev mailing list discussion: [[DISCUSS] scalability limits in the coordinator](http://mail-archives.apache.org/mod_mbox/kafka-dev/201605.mbox/%3CCAMQuQBZDdtAdhcgL6h4SmTgO83UQt4s72gc03B3VFghnME3FTAmail.gmail.com%3E)

There's a scalability limit on the new consumer / coordinator regarding the amount of group
metadata we can fit into one message. This restricts a combination of consumer group size,
topic subscription sizes, topic assignment sizes, and any remaining member metadata.

Under more strenuous use cases like mirroring clusters with thousands of topics, this limitation
can be reached even after applying gzip to the __consumer_offsets topic.

Various options were proposed in the discussion:
1. Config change: reduce the number of consumers in the group. This isn't always a realistic
answer in more strenuous use cases like MirrorMaker clusters or for auditing.
2. Config change: split the group into smaller groups which together will get full coverage
of the topics. This gives each group member a smaller subscription.(ex: g1 has topics starting
with a-m while g2 has topics starting with n-z). This would be operationally painful to manage.
3. Config change: split the topics among members of the group. Again this gives each group
member a smaller subscription. This would also be operationally painful to manage.
4. Config change: bump up KafkaConfig.messageMaxBytes (a topic-level config) and KafkaConfig.replicaFetchMaxBytes
(a broker-level config). Applying messageMaxBytes to just the __consumer_offsets topic seems
relatively harmless, but bumping up the broker-level replicaFetchMaxBytes would probably need
more attention.
5. Config change: try different compression codecs. Based on 2 minutes of googling, it seems
like lz4 and snappy are faster than gzip but have worse compression, so this probably won't
6. Implementation change: support sending the regex over the wire instead of the fully expanded
topic subscriptions. I think people said in the past that different languages have subtle
differences in regex, so this doesn't play nicely with cross-language groups.
7. Implementation change: maybe we can reverse the mapping? Instead of mapping from member
to subscriptions, we can map a subscription to a list of members.
8. Implementation change: maybe we can try to break apart the subscription and assignments
from the same SyncGroupRequest into multiple records? They can still go to the same message
set and get appended together. This way the limit become the segment size, which shouldn't
be a problem. This can be tricky to get right because we're currently keying these messages
on the group, so I think records from the same rebalance might accidentally compact one another,
but my understanding of compaction isn't that great.
9. Implementation change: try to apply some tricks on the assignment serialization to make
it smaller.
10. Config and Implementation change: bump up the __consumer_offsets topic messageMaxBytes
and (from Jun Rao) fix how we deal with the case when a message is larger than the fetch size.
Today, if the fetch size is smaller than the fetch size, the consumer will get stuck. Instead,
we can simply return the full message if it's larger than the fetch size w/o requiring the
consumer to manually adjust the fetch size.
11. Config and Implementation change: same as above but only apply the special fetch logic
when fetching from internal topics

This PR provides an implementation of option 11.

That being said, I'm not very happy with this approach as it essentially doesn't honor the
"replica.fetch.max.bytes" config. Better alternatives are definitely welcome!

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1484 from onurkaraman/KAFKA-3810

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40fb9093
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40fb9093
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40fb9093

Branch: refs/heads/trunk
Commit: 40fb90937e7ce5e9a44ab634c5987da1ab73a04f
Parents: 7fd4fe4
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Sun Jun 19 11:42:00 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Sun Jun 19 11:42:00 2016 -0700

 core/src/main/scala/kafka/server/ReplicaManager.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 447fb40..b1cf68b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchToo
 InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException,
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.TopicConstants
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest}
@@ -536,7 +537,8 @@ class ReplicaManager(val config: KafkaConfig,
           val initialLogEndOffset = localReplica.logEndOffset
           val logReadInfo = localReplica.log match {
             case Some(log) =>
-              log.read(offset, fetchSize, maxOffsetOpt)
+              val adjustedFetchSize = if (TopicConstants.INTERNAL_TOPICS.contains(topic)
&& !readOnlyCommitted) Math.max(fetchSize, log.config.maxMessageSize) else fetchSize
+              log.read(offset, adjustedFetchSize, maxOffsetOpt)
             case None =>
               error("Leader for partition [%s,%d] does not have a local log".format(topic,
               FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)

View raw message