kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1641; Reset first dirty offset for compaction to earliest offset if the checkpointed offset is invalid; reviewed by Joel Koshy
Date Thu, 23 Oct 2014 22:43:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cda1f73d0 -> 35f589bb4


KAFKA-1641; Reset first dirty offset for compaction to earliest offset
if the checkpointed offset is invalid; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/35f589bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/35f589bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/35f589bb

Branch: refs/heads/trunk
Commit: 35f589bb4654b49035c27780717f560e74400444
Parents: cda1f73
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Thu Oct 23 15:42:10 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Oct 23 15:42:48 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/log/LogCleanerManager.scala     | 30 ++++++++++++++++----
 1 file changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/35f589bb/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index e8ced6a..bcfef77 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -75,13 +75,31 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
   def grabFilthiestLog(): Option[LogToClean] = {
     inLock(lock) {
       val lastClean = allCleanerCheckpoints()
-      val dirtyLogs = logs.filter(l => l._2.config.compact)          // skip any logs
marked for delete rather than dedupe
-                          .filterNot(l => inProgress.contains(l._1)) // skip any logs
already in-progress
-                          .map(l => LogToClean(l._1, l._2,           // create a LogToClean
instance for each
-                                               lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
-                          .filter(l => l.totalBytes > 0)             // skip any empty
logs
+      val dirtyLogs = logs.filter {
+        case (topicAndPartition, log) => log.config.compact  // skip any logs marked for
delete rather than dedupe
+      }.filterNot {
+        case (topicAndPartition, log) => inProgress.contains(topicAndPartition) // skip
any logs already in-progress
+      }.map {
+        case (topicAndPartition, log) => // create a LogToClean instance for each
+          // if the log segments are abnormally truncated and hence the checkpointed offset
+          // is no longer valid, reset to the log starting offset and log the error event
+          val logStartOffset = log.logSegments.head.baseOffset
+          val firstDirtyOffset = {
+            val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
+            if (offset < logStartOffset) {
+              error("Resetting first dirty offset to log start offset %d since the checkpointed
offset %d is invalid."
+                    .format(logStartOffset, offset))
+              logStartOffset
+            } else {
+              offset
+            }
+          }
+          LogToClean(topicAndPartition, log, firstDirtyOffset)
+      }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
+
       this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio
else 0
-      val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio)
// and must meet the minimum threshold for dirty byte ratio
+      // and must meet the minimum threshold for dirty byte ratio
+      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
       if(cleanableLogs.isEmpty) {
         None
       } else {


Mime
View raw message