Repository: kafka
Updated Branches:
refs/heads/trunk cda1f73d0 -> 35f589bb4
KAFKA-1641; Reset first dirty offset for compaction to earliest offset
if the checkpointed offset is invalid; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/35f589bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/35f589bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/35f589bb
Branch: refs/heads/trunk
Commit: 35f589bb4654b49035c27780717f560e74400444
Parents: cda1f73
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Thu Oct 23 15:42:10 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Oct 23 15:42:48 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/log/LogCleanerManager.scala | 30 ++++++++++++++++----
1 file changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/35f589bb/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index e8ced6a..bcfef77 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -75,13 +75,31 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
def grabFilthiestLog(): Option[LogToClean] = {
inLock(lock) {
val lastClean = allCleanerCheckpoints()
- val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs
marked for delete rather than dedupe
- .filterNot(l => inProgress.contains(l._1)) // skip any logs
already in-progress
- .map(l => LogToClean(l._1, l._2, // create a LogToClean
instance for each
- lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
- .filter(l => l.totalBytes > 0) // skip any empty
logs
+ val dirtyLogs = logs.filter {
+ case (topicAndPartition, log) => log.config.compact // skip any logs marked for
delete rather than dedupe
+ }.filterNot {
+ case (topicAndPartition, log) => inProgress.contains(topicAndPartition) // skip
any logs already in-progress
+ }.map {
+ case (topicAndPartition, log) => // create a LogToClean instance for each
+ // if the log segments are abnormally truncated and hence the checkpointed offset
+ // is no longer valid, reset to the log starting offset and log the error event
+ val logStartOffset = log.logSegments.head.baseOffset
+ val firstDirtyOffset = {
+ val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
+ if (offset < logStartOffset) {
+ error("Resetting first dirty offset to log start offset %d since the checkpointed
offset %d is invalid."
+ .format(logStartOffset, offset))
+ logStartOffset
+ } else {
+ offset
+ }
+ }
+ LogToClean(topicAndPartition, log, firstDirtyOffset)
+ }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
+
this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio
else 0
- val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio)
// and must meet the minimum threshold for dirty byte ratio
+ // and must meet the minimum threshold for dirty byte ratio
+ val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
None
} else {
|