From commits-return-2410-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Mon Jun 22 16:20:06 2015 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 52940189A9 for ; Mon, 22 Jun 2015 16:20:06 +0000 (UTC) Received: (qmail 68673 invoked by uid 500); 22 Jun 2015 16:20:06 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 68643 invoked by uid 500); 22 Jun 2015 16:20:06 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 68634 invoked by uid 99); 22 Jun 2015 16:20:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jun 2015 16:20:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E7ADCE03D2; Mon, 22 Jun 2015 16:20:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: junrao@apache.org To: commits@kafka.apache.org Message-Id: <185f2293ca514b8097161b2eab7d6636@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: kafka-2235; LogCleaner offset map overflow; patched by Ivan Simoneko; reviewed by Jun Rao Date: Mon, 22 Jun 2015 16:20:05 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk cf28f8939 -> dc54055d0 kafka-2235; LogCleaner offset map overflow; patched by Ivan Simoneko; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc54055d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc54055d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc54055d Branch: refs/heads/trunk Commit: dc54055d05742a4a7729a1fe1073c18e3d95cbb2 Parents: cf28f89 Author: Ivan Simoneko Authored: Mon Jun 22 09:19:45 2015 -0700 Committer: Jun Rao Committed: Mon Jun 22 09:19:45 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dc54055d/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d07a391..b36ea0d 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -559,11 +559,17 @@ private[log] class Cleaner(val id: Int, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) var offset = dirty.head.baseOffset require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) - val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong - for (segment <- dirty) { + val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt + var full = false + for (segment <- dirty if !full) { checkDone(log.topicAndPartition) - if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor) + val segmentSize = segment.nextOffset() - segment.baseOffset + + require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize)) + if (map.size + segmentSize <= maxDesiredMapSize) offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) + else + full = true } info("Offset map for log %s complete.".format(log.name)) offset