kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7215: Improve LogCleaner Error Handling (#5439)
Date Mon, 08 Oct 2018 19:54:54 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 13379af  KAFKA-7215: Improve LogCleaner Error Handling (#5439)
13379af is described below

commit 13379af17d95788568038c0c6daeb983819669e3
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Mon Oct 8 20:54:37 2018 +0100

    KAFKA-7215: Improve LogCleaner Error Handling (#5439)
    
    The thread no longer dies. When encountering an unexpected error, it marks the partition as "uncleanable" which means it will not try to clean its logs in subsequent runs.
    
    Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            |   2 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     | 107 ++++--
 .../main/scala/kafka/log/LogCleanerManager.scala   |  86 ++++-
 .../scala/kafka/server/LogDirFailureChannel.scala  |   3 +-
 .../log/AbstractLogCleanerIntegrationTest.scala    |  30 ++
 .../unit/kafka/log/LogCleanerIntegrationTest.scala | 389 ++++-----------------
 .../kafka/log/LogCleanerLagIntegrationTest.scala   |  12 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 193 ++++++++--
 ...> LogCleanerParameterizedIntegrationTest.scala} |  35 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |   7 +-
 core/src/test/scala/unit/kafka/log/LogUtils.scala  |  41 +++
 11 files changed, 479 insertions(+), 426 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8915c14..094473a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -193,7 +193,7 @@ class Log(@volatile var dir: File,
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
-  // The memory mapped buffer for index files of this log will be closed for index files of this log will be closed with either delete() or closeHandlers()
+  // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
   // After memory mapped buffer is closed, no disk IO operation should be performed for this log
   @volatile private var isMemoryMappedBufferClosed = false
 
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bf4f7e1..0416325 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.collection.{Iterable, Set, mutable}
+import scala.util.control.ControlThrowable
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy.
@@ -293,49 +294,75 @@ class LogCleaner(initialConfig: CleanerConfig,
 
     /**
      * The main loop for the cleaner thread
+     * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
     override def doWork() {
-      cleanOrSleep()
+      val cleaned = cleanFilthiestLog()
+      if (!cleaned)
+        pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
 
     /**
-     * Clean a log if there is a dirty log available, otherwise sleep for a bit
-     */
-    private def cleanOrSleep() {
-      val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
-        case None =>
-          false
-        case Some(cleanable) =>
-          // there's a log, clean it
-          var endOffset = cleanable.firstDirtyOffset
-          try {
-            val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
-            recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
-            endOffset = nextDirtyOffset
-          } catch {
-            case _: LogCleaningAbortedException => // task can be aborted, let it go.
-            case _: KafkaStorageException => // partition is already offline. let it go.
-            case e: IOException =>
-              val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException"
-              logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
-          } finally {
-            cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
+      * Cleans a log if there is a dirty log available
+      * @return whether a log was cleaned
+      */
+    private def cleanFilthiestLog(): Boolean = {
+      var currentLog: Option[Log] = None
+
+      try {
+        val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
+          case None =>
+            false
+          case Some(cleanable) =>
+            // there's a log, clean it
+            currentLog = Some(cleanable.log)
+            cleanLog(cleanable)
+            true
+        }
+        val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
+        try {
+          deletable.foreach {
+            case (topicPartition, log) =>
+              try {
+                currentLog = Some(log)
+                log.deleteOldSegments()
+              }
           }
-          true
+        } finally  {
+          cleanerManager.doneDeleting(deletable.map(_._1))
+        }
+
+        cleaned
+      } catch {
+        case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
+        case e: Exception =>
+          if (currentLog.isEmpty) {
+            throw new IllegalStateException("currentLog cannot be empty on an unexpected exception", e)
+          }
+          val erroneousLog = currentLog.get
+          warn(s"Unexpected exception thrown when cleaning log $erroneousLog. Marking its partition (${erroneousLog.topicPartition}) as uncleanable", e)
+          cleanerManager.markPartitionUncleanable(erroneousLog.dir.getParent, erroneousLog.topicPartition)
+
+          false
       }
-      val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
+    }
 
+    private def cleanLog(cleanable: LogToClean): Unit = {
+      var endOffset = cleanable.firstDirtyOffset
       try {
-        deletable.foreach {
-          case (_, log) =>
-            log.deleteOldSegments()
-        }
+        val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
+        recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
+        endOffset = nextDirtyOffset
+      } catch {
+        case _: LogCleaningAbortedException => // task can be aborted, let it go.
+        case _: KafkaStorageException => // partition is already offline. let it go.
+        case e: IOException =>
+          var logDirectory = cleanable.log.dir.getParent
+          val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
       } finally {
-        cleanerManager.doneDeleting(deletable.map(_._1))
+        cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
       }
-
-      if (!cleaned)
-        pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
 
     /**
@@ -398,6 +425,18 @@ object LogCleaner {
     LogSegment.open(log.dir, baseOffset, log.config, Time.SYSTEM, fileAlreadyExists = false,
       fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate)
   }
+
+  /**
+    * Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log
+    * @return the biggest uncleanable offset and the total amount of cleanable bytes
+    */
+  def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
+    val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
+    val firstUncleanableOffset = firstUncleanableSegment.baseOffset
+    val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
+
+    (firstUncleanableOffset, cleanableBytes)
+  }
 }
 
 /**
@@ -951,9 +990,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
  */
 private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
-  private[this] val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
-  val firstUncleanableOffset = firstUncleanableSegment.baseOffset
-  val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
+  val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
   val totalBytes = cleanBytes + cleanableBytes
   val cleanableRatio = cleanableBytes / totalBytes.toDouble
   override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 7a96d8f..13d14c1 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -73,12 +73,55 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   /* the set of logs currently being cleaned */
   private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()
 
+  /* the set of uncleanable partitions (partitions that have raised an unexpected error during cleaning)
+   *   for each log directory */
+  private val uncleanablePartitions = mutable.HashMap[String, mutable.Set[TopicPartition]]()
+
+  /* the set of directories marked as uncleanable and therefore offline */
+  private val uncleanableDirs = mutable.HashSet[String]()
+
   /* a global lock used to control all access to the in-progress set and the offset checkpoints */
   private val lock = new ReentrantLock
 
   /* for coordinating the pausing and the cleaning of a partition */
   private val pausedCleaningCond = lock.newCondition()
 
+  /* gauges for tracking the number of partitions marked as uncleanable for each log directory */
+  for (dir <- logDirs) {
+    newGauge(
+      "uncleanable-partitions-count",
+      new Gauge[Int] { def value = inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) } },
+      Map("logDirectory" -> dir.getAbsolutePath)
+    )
+  }
+
+  /* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */
+    for (dir <- logDirs) {
+      newGauge(
+        "uncleanable-bytes",
+        new Gauge[Long] {
+          def value = {
+            inLock(lock) {
+              uncleanablePartitions.get(dir.getAbsolutePath) match {
+                case Some(partitions) => {
+                  val lastClean = allCleanerCheckpoints
+                  val now = Time.SYSTEM.milliseconds
+                  partitions.map { tp =>
+                    val log = logs.get(tp)
+                    val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, tp, lastClean, now)
+                    val (_, uncleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset)
+                    uncleanableBytes
+                  }.sum
+                }
+                case _ => 0
+              }
+            }
+          }
+        },
+        Map("logDirectory" -> dir.getAbsolutePath)
+      )
+    }
+
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
   @volatile private var dirtiestLogCleanableRatio = 0.0
   newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
@@ -135,7 +178,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val dirtyLogs = logs.filter {
         case (_, log) => log.config.compact  // match logs that are marked as compacted
       }.filterNot {
-        case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
+        case (topicPartition, log) =>
+          // skip any logs already in-progress and uncleanable partitions
+          inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
       }.map {
         case (topicPartition, log) => // create a LogToClean instance for each
           val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
@@ -179,13 +224,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-    * Find any logs that have compaction enabled. Include logs without delete enabled, as they may have segments
+    * Find any logs that have compaction enabled. Mark them as being cleaned
+    * Include logs without delete enabled, as they may have segments
     * that precede the start offset.
     */
   def deletableLogs(): Iterable[(TopicPartition, Log)] = {
     inLock(lock) {
       val toClean = logs.filter { case (topicPartition, log) =>
-        !inProgress.contains(topicPartition) && log.config.compact
+        !inProgress.contains(topicPartition) && log.config.compact &&
+          !isUncleanablePartition(log, topicPartition)
       }
       toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
       toClean
@@ -332,6 +379,12 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         case e: KafkaStorageException =>
           error(s"Failed to access checkpoint file in dir ${sourceLogDir.getAbsolutePath}", e)
       }
+
+      val logUncleanablePartitions = uncleanablePartitions.getOrElse(sourceLogDir.toString, mutable.Set[TopicPartition]())
+      if (logUncleanablePartitions.contains(topicPartition)) {
+        logUncleanablePartitions.remove(topicPartition)
+        markPartitionUncleanable(destLogDir.toString, topicPartition)
+      }
     }
   }
 
@@ -393,6 +446,33 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       }
     }
   }
+
+  /**
+   * Returns an immutable set of the uncleanable partitions for a given log directory
+   * Only used for testing
+   */
+  private[log] def uncleanablePartitions(logDir: String): Set[TopicPartition] = {
+    var partitions: Set[TopicPartition] = Set()
+    inLock(lock) { partitions ++= uncleanablePartitions.getOrElse(logDir, partitions) }
+    partitions
+  }
+
+  def markPartitionUncleanable(logDir: String, partition: TopicPartition): Unit = {
+    inLock(lock) {
+      uncleanablePartitions.get(logDir) match {
+        case Some(partitions) =>
+          partitions.add(partition)
+        case None =>
+          uncleanablePartitions.put(logDir, mutable.Set(partition))
+      }
+    }
+  }
+
+  private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
+    inLock(lock) {
+      uncleanablePartitions.get(log.dir.getParent).exists(partitions => partitions.contains(topicPartition))
+    }
+  }
 }
 
 private[log] object LogCleanerManager extends Logging {
diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
index c78f04e..897d3fc 100644
--- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
+++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
@@ -45,9 +45,8 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
    */
   def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
     error(msg, e)
-    if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) {
+    if (offlineLogDirs.putIfAbsent(logDir, logDir) == null)
       offlineLogDirQueue.add(logDir)
-    }
   }
 
   /*
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 0ad5b46..2a483fa 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -24,10 +24,13 @@ import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.{MockTime, Pool, TestUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch}
 import org.apache.kafka.common.utils.Utils
 import org.junit.After
 
+import scala.collection.Seq
 import scala.collection.mutable.ListBuffer
+import scala.util.Random
 
 abstract class AbstractLogCleanerIntegrationTest {
 
@@ -118,4 +121,31 @@ abstract class AbstractLogCleanerIntegrationTest {
       logDirFailureChannel = new LogDirFailureChannel(1),
       time = time)
   }
+
+  def codec: CompressionType
+  private var ctr = 0
+  def counter: Int = ctr
+  def incCounter(): Unit = ctr += 1
+
+  def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
+                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
+    for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
+      val value = counter.toString
+      val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
+        key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
+      incCounter()
+      (key, value, appendInfo.firstOffset.get)
+    }
+  }
+
+  def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = {
+    def messageValue(length: Int): String = {
+      val random = new Random(0)
+      new String(random.alphanumeric.take(length).toArray)
+    }
+    val value = messageValue(128)
+    val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes,
+      magicValue = messageFormatVersion)
+    (value, messageSet)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
old mode 100755
new mode 100644
index 64e8b38..bfee811
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -1,349 +1,96 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 
 package kafka.log
 
-import java.io.File
-import java.util.Properties
+import java.io.PrintWriter
 
-import kafka.api.KAFKA_0_11_0_IV0
-import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
-import kafka.server.KafkaConfig
-import kafka.server.checkpoints.OffsetCheckpointFile
-import kafka.utils._
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Gauge
+import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record._
-import org.junit.Assert._
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
+import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
+import org.apache.kafka.common.record.{CompressionType, RecordBatch}
+import org.junit.Assert.{assertFalse, assertTrue, fail}
+import org.junit.Test
 
-import scala.Seq
-import scala.collection._
-import scala.util.Random
+import scala.collection.JavaConverters.mapAsScalaMapConverter
 
 /**
- * This is an integration test that tests the fully integrated log cleaner
- */
-@RunWith(value = classOf[Parameterized])
-class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest {
+  * This is an integration test that tests the fully integrated log cleaner
+  */
+class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
+
+  val codec: CompressionType = CompressionType.LZ4
 
-  val codec = CompressionType.forName(compressionCodec)
   val time = new MockTime()
-  var counter = 0
   val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
 
-  @Test
-  def cleanerTest() {
+  @Test(timeout = DEFAULT_MAX_WAIT_MS)
+  def testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics() {
     val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
+    val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
     val maxMessageSize = largeMessageSet.sizeInBytes
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, backOffMs = 100)
 
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
-    val log = cleaner.logs.get(topicPartitions(0))
-
-    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
-    val startSize = log.size
-    cleaner.startup()
-
-    val firstDirty = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.map(_.size).sum
-    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-
-    checkLogAfterAppendingDups(log, startSize, appends)
-
-    val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
-    val largeMessageOffset = appendInfo.firstOffset.get
-
-    val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
-    val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups
-    val firstDirty2 = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty2)
-
-    checkLogAfterAppendingDups(log, startSize, appends2)
-
-    // simulate deleting a partition, by removing it from logs
-    // force a checkpoint
-    // and make sure its gone from checkpoint file
-    cleaner.logs.remove(topicPartitions(0))
-    cleaner.updateCheckpoints(logDir)
-    val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile)).read()
-    // we expect partition 0 to be gone
-    assertFalse(checkpoints.contains(topicPartitions(0)))
-  }
-
-  @Test
-  def testCleansCombinedCompactAndDeleteTopic(): Unit = {
-    val logProps  = new Properties()
-    val retentionMs: Integer = 100000
-    logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
-    logProps.put(LogConfig.CleanupPolicyProp, "compact,delete")
-
-    def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = {
-      cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L)
-      val log = cleaner.logs.get(topicPartitions(0))
-
-      val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
-      val startSize = log.size
-
-      log.onHighWatermarkIncremented(log.logEndOffset)
-
-      val firstDirty = log.activeSegment.baseOffset
-      cleaner.startup()
-
-      // should compact the log
-      checkLastCleaned("log", 0, firstDirty)
-      val compactedSize = log.logSegments.map(_.size).sum
-      assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-      (log, messages)
-    }
-
-    val (log, _) = runCleanerAndCheckCompacted(100)
-    // should delete old segments
-    log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs))
-
-    TestUtils.waitUntilTrue(() => log.numberOfSegments == 1, "There should only be 1 segment remaining", 10000L)
-    assertEquals(1, log.numberOfSegments)
+    def breakPartitionLog(tp: TopicPartition): Unit = {
+      val log = cleaner.logs.get(tp)
+      writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
 
-    cleaner.shutdown()
+      val partitionFile = log.logSegments.last.log.file()
+      val writer = new PrintWriter(partitionFile)
+      writer.write("jogeajgoea")
+      writer.close()
 
-    // run the cleaner again to make sure if there are no issues post deletion
-    val (log2, messages) = runCleanerAndCheckCompacted(20)
-    val read = readFromLog(log2)
-    assertEquals("Contents of the map shouldn't change", toMap(messages), toMap(read))
-  }
-
-  // returns (value, ByteBufferMessageSet)
-  private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = {
-    def messageValue(length: Int): String = {
-      val random = new Random(0)
-      new String(random.alphanumeric.take(length).toArray)
+      writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
     }
-    val value = messageValue(128)
-    val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes,
-      magicValue = messageFormatVersion)
-    (value, messageSet)
-  }
 
-  @Test
-  def testCleanerWithMessageFormatV0(): Unit = {
-    val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0)
-    val maxMessageSize = codec match {
-      case CompressionType.NONE => largeMessageSet.sizeInBytes
-      case _ =>
-        // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to
-        // increase because the broker offsets are larger than the ones assigned by the client
-        // adding `5` to the message set size is good enough for this test: it covers the increased message size while
-        // still being less than the overhead introduced by the conversion from message format version 0 to 1
-        largeMessageSet.sizeInBytes + 5
+    def getGauge[T](metricName: String, metricScope: String): Gauge[T] = {
+      Metrics.defaultRegistry.allMetrics.asScala
+        .filterKeys(k => {
+          k.getName.endsWith(metricName) && k.getScope.endsWith(metricScope)
+        })
+        .headOption
+        .getOrElse { fail(s"Unable to find metric $metricName") }
+        .asInstanceOf[(Any, Gauge[Any])]
+        ._2
+        .asInstanceOf[Gauge[T]]
     }
 
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
-
-    val log = cleaner.logs.get(topicPartitions(0))
-    val props = logConfigProperties(maxMessageSize = maxMessageSize)
-    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
-    log.config = new LogConfig(props)
+    breakPartitionLog(topicPartitions(0))
+    breakPartitionLog(topicPartitions(1))
 
-    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-    val startSize = log.size
     cleaner.startup()
 
-    val firstDirty = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.map(_.size).sum
-    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-
-    checkLogAfterAppendingDups(log, startSize, appends)
-
-    val appends2: Seq[(Int, String, Long)] = {
-      val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-      val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
-      val largeMessageOffset = appendInfo.firstOffset.get
-
-      // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
-      props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_11_0_IV0.version)
-      log.config = new LogConfig(props)
-      val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
-      val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2)
-      appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2
-    }
-    val firstDirty2 = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty2)
-
-    checkLogAfterAppendingDups(log, startSize, appends2)
-  }
-
-  @Test
-  def testCleaningNestedMessagesWithMultipleVersions(): Unit = {
-    val maxMessageSize = 192
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
-
     val log = cleaner.logs.get(topicPartitions(0))
-    val props = logConfigProperties(maxMessageSize = maxMessageSize)
-    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
-    log.config = new LogConfig(props)
-
-    // with compression enabled, these messages will be written as a single message containing
-    // all of the individual messages
-    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-
-    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
-    log.config = new LogConfig(props)
-
-    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
-
-    val appends = appendsV0 ++ appendsV1
-
-    val startSize = log.size
-    cleaner.startup()
-
-    val firstDirty = log.activeSegment.baseOffset
-    assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1
-
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.map(_.size).sum
-    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-
-    checkLogAfterAppendingDups(log, startSize, appends)
-  }
-
-  @Test
-  def cleanerConfigUpdateTest() {
-    val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
-    val maxMessageSize = largeMessageSet.sizeInBytes
-
-    cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1, maxMessageSize = maxMessageSize,
-      cleanerIoBufferSize = Some(1))
-    val log = cleaner.logs.get(topicPartitions(0))
-
-    writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
-    val startSize = log.size
-    cleaner.startup()
-    assertEquals(1, cleaner.cleanerCount)
-
-    // Verify no cleaning with LogCleanerIoBufferSizeProp=1
-    val firstDirty = log.activeSegment.baseOffset
-    val topicPartition = new TopicPartition("log", 0)
-    cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10)
-    assertTrue("Should not have cleaned", cleaner.cleanerManager.allCleanerCheckpoints.isEmpty)
-
-    def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): KafkaConfig = {
-      val props = TestUtils.createBrokerConfig(0, "localhost:2181")
-      props.put(KafkaConfig.LogCleanerThreadsProp, cleanerConfig.numThreads.toString)
-      props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, cleanerConfig.dedupeBufferSize.toString)
-      props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, cleanerConfig.dedupeBufferLoadFactor.toString)
-      props.put(KafkaConfig.LogCleanerIoBufferSizeProp, cleanerConfig.ioBufferSize.toString)
-      props.put(KafkaConfig.MessageMaxBytesProp, cleanerConfig.maxMessageSize.toString)
-      props.put(KafkaConfig.LogCleanerBackoffMsProp, cleanerConfig.backOffMs.toString)
-      props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, cleanerConfig.maxIoBytesPerSecond.toString)
-      KafkaConfig.fromProps(props)
-    }
-
-    // Verify cleaning done with larger LogCleanerIoBufferSizeProp
-    val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
-    val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2,
-      dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize,
-      dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor,
-      ioBufferSize = 100000,
-      maxMessageSize = cleaner.currentConfig.maxMessageSize,
-      maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond,
-      backOffMs = cleaner.currentConfig.backOffMs))
-    cleaner.reconfigure(oldConfig, newConfig)
-
-    assertEquals(2, cleaner.cleanerCount)
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.map(_.size).sum
-    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-  }
-
-  private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) {
-    // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than
-    // LogConfig.MinCleanableDirtyRatioProp
-    val topicPartition = new TopicPartition(topic, partitionId)
-    cleaner.awaitCleaned(topicPartition, firstDirty)
-    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(topicPartition)
-    assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned",
-      lastCleaned >= firstDirty)
-  }
-
-  private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String, Long)]) {
-    val read = readFromLog(log)
-    assertEquals("Contents of the map shouldn't change", toMap(appends), toMap(read))
-    assertTrue(startSize > log.size)
-  }
-
-  private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int, (String, Long)] = {
-    messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
-  }
-
-  private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
-    import JavaConverters._
-    for (segment <- log.logSegments; deepLogEntry <- segment.log.records.asScala) yield {
-      val key = TestUtils.readString(deepLogEntry.key).toInt
-      val value = TestUtils.readString(deepLogEntry.value)
-      (key, value, deepLogEntry.offset)
-    }
-  }
-
-  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
-    for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
-      val value = counter.toString
-      val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
-              key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
-      counter += 1
-      (key, value, appendInfo.firstOffset.get)
-    }
-  }
-
-  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                                        startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = {
-    val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
-      val payload = counter.toString
-      counter += 1
-      (key, payload)
-    }
-
-    val records = kvs.map { case (key, payload) =>
-      new SimpleRecord(key.toString.getBytes, payload.toString.getBytes)
-    }
-
-    val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
-    val offsets = appendInfo.firstOffset.get to appendInfo.lastOffset
-
-    kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
-  }
-
-}
-
-object LogCleanerIntegrationTest {
-  @Parameters
-  def parameters: java.util.Collection[Array[String]] = {
-    val list = new java.util.ArrayList[Array[String]]()
-    for (codec <- CompressionType.values)
-      list.add(Array(codec.name))
-    list
+    val log2 = cleaner.logs.get(topicPartitions(1))
+    val uncleanableDirectory = log.dir.getParent
+    val uncleanablePartitionsCountGauge = getGauge[Int]("uncleanable-partitions-count", uncleanableDirectory)
+    val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", uncleanableDirectory)
+
+    TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 2, "There should be 2 uncleanable partitions", 2000L)
+    val expectedTotalUncleanableBytes = LogCleaner.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 +
+      LogCleaner.calculateCleanableBytes(log2, 0, log2.logSegments.last.baseOffset)._2
+    TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == expectedTotalUncleanableBytes,
+      s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 1000L)
+
+    val uncleanablePartitions = cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory)
+    assertTrue(uncleanablePartitions.contains(topicPartitions(0)))
+    assertTrue(uncleanablePartitions.contains(topicPartitions(1)))
+    assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
   }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index bf634d7..6e8c9b9 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -41,9 +41,10 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
   val cleanerBackOffMs = 200L
   val segmentSize = 512
-  var counter = 0
+
+  override def codec: CompressionType = CompressionType.forName(compressionCodecName)
+
   val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
-  val compressionCodec = CompressionType.forName(compressionCodecName)
 
   @Test
   def cleanerTest(): Unit = {
@@ -55,7 +56,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
 
     // t = T0
     val T0 = time.milliseconds
-    val appends0 = writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T0)
+    val appends0 = writeDups(numKeys = 100, numDups = 3, log, codec, timestamp = T0)
     val startSizeBlock0 = log.size
     debug(s"total log size at T0: $startSizeBlock0")
 
@@ -78,7 +79,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
     val T1 = time.milliseconds
 
     // write another block of data
-    val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T1)
+    val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, codec, timestamp = T1)
     val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset
 
     // the first block should get cleaned
@@ -111,11 +112,10 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
       val count = counter
       log.appendAsLeader(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec,
               key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
-      counter += 1
+      incCounter()
       (key, count)
     }
   }
-
 }
 
 object LogCleanerLagIntegrationTest {
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 2a48690..8ca26a8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -29,6 +29,8 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import org.scalatest.junit.JUnitSuite
 
+import scala.collection.mutable
+
 /**
   * Unit tests for the log cleaning logic
   */
@@ -36,6 +38,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("log", 0)
   val logProps = new Properties()
   logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
@@ -43,11 +46,124 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   val logConfig = LogConfig(logProps)
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
 
+  val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
+
+  class LogCleanerManagerMock(logDirs: Seq[File],
+                              logs: Pool[TopicPartition, Log],
+                              logDirFailureChannel: LogDirFailureChannel) extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
+    override def allCleanerCheckpoints: Map[TopicPartition, Long] = {
+      cleanerCheckpoints.toMap
+    }
+  }
+
   @After
   def tearDown(): Unit = {
     Utils.delete(tmpDir)
   }
 
+  @Test
+  def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
+    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
+    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
+    logs.put(tp1, log1)
+    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
+    logs.put(tp2, log2)
+    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
+    logs.put(tp3, log3)
+    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
+    cleanerCheckpoints.put(tp1, 0) // all clean
+    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
+    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+
+    val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
+
+    assertEquals(log2, filthiestLog.log)
+    assertEquals(tp2, filthiestLog.topicPartition)
+  }
+
+  @Test
+  def testGrabFilthiestCompactedLogIgnoresUncleanablePartitions(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
+    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
+    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
+    logs.put(tp1, log1)
+    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
+    logs.put(tp2, log2)
+    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
+    logs.put(tp3, log3)
+    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
+    cleanerCheckpoints.put(tp1, 0) // all clean
+    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
+    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+    cleanerManager.markPartitionUncleanable(log2.dir.getParent, tp2)
+
+    val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
+
+    assertEquals(log3, filthiestLog.log)
+    assertEquals(tp3, filthiestLog.topicPartition)
+  }
+
+  @Test
+  def testGrabFilthiestCompactedLogIgnoresInProgressPartitions(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
+    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
+    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
+    logs.put(tp1, log1)
+    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
+    logs.put(tp2, log2)
+    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
+    logs.put(tp3, log3)
+    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
+    cleanerCheckpoints.put(tp1, 0) // all clean
+    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
+    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+    cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
+
+    val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
+
+    assertEquals(log3, filthiestLog.log)
+    assertEquals(tp3, filthiestLog.topicPartition)
+  }
+
+  @Test
+  def testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
+    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
+    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
+    logs.put(tp1, log1)
+    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
+    logs.put(tp2, log2)
+    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
+    logs.put(tp3, log3)
+    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
+    cleanerCheckpoints.put(tp1, 0) // all clean
+    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
+    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+    cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
+    cleanerManager.markPartitionUncleanable(log3.dir.getParent, tp3)
+
+    val filthiestLog: Option[LogToClean] = cleanerManager.grabFilthiestCompactedLog(time)
+
+    assertTrue(filthiestLog.isEmpty)
+  }
+
   /**
     * When checking for logs with segments ready for deletion
     * we shouldn't consider logs where cleanup.policy=delete
@@ -166,7 +282,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     */
   @Test
   def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val records = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes)
     val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
@@ -181,6 +297,21 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   }
 
   /**
+    * When looking for logs with segments ready to be deleted we shouldn't consider
+    * logs that have had their partition marked as uncleanable.
+    */
+  @Test
+  def testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    cleanerManager.markPartitionUncleanable(log.dir.getParent, topicPartition)
+
+    val readyToDelete = cleanerManager.deletableLogs().size
+    assertEquals("should have 0 logs ready to be deleted", 0, readyToDelete)
+  }
+
+  /**
     * Test computation of cleanable range with no minimum compaction lag settings active
     */
   @Test
@@ -193,7 +324,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     while(log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
-    val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
@@ -224,7 +354,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     while (log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0)
 
-    val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
@@ -250,7 +379,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     time.sleep(compactionLag + 1)
 
-    val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
@@ -259,7 +387,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
   @Test
   def testUndecidedTransactionalDataNotCleanable(): Unit = {
-    val topicPartition = new TopicPartition("log", 0)
     val compactionLag = 60 * 60 * 1000
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
@@ -315,21 +442,20 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
-    val tp = new TopicPartition("log", 0)
-    intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1))
+    intercept[IllegalStateException](cleanerManager.doneCleaning(topicPartition, log.dir, 1))
 
-    cleanerManager.setCleaningState(tp, LogCleaningPaused(1))
-    intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1))
+    cleanerManager.setCleaningState(topicPartition, LogCleaningPaused(1))
+    intercept[IllegalStateException](cleanerManager.doneCleaning(topicPartition, log.dir, 1))
 
-    cleanerManager.setCleaningState(tp, LogCleaningInProgress)
-    cleanerManager.doneCleaning(tp, log.dir, 1)
-    assertTrue(cleanerManager.cleaningState(tp).isEmpty)
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+    cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress)
+    cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+    assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
 
-    cleanerManager.setCleaningState(tp, LogCleaningAborted)
-    cleanerManager.doneCleaning(tp, log.dir, 1)
-    assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+    cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
+    cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+    assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(topicPartition).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
   }
 
   @Test
@@ -337,7 +463,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
     val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
     val tp = new TopicPartition("log", 0)
 
     intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
@@ -352,21 +477,27 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     cleanerManager.setCleaningState(tp, LogCleaningAborted)
     cleanerManager.doneDeleting(Seq(tp))
     assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
-
   }
 
   private def createCleanerManager(log: Log): LogCleanerManager = {
     val logs = new Pool[TopicPartition, Log]()
-    logs.put(new TopicPartition("log", 0), log)
-    val cleanerManager = new LogCleanerManager(Array(logDir), logs, null)
-    cleanerManager
+    logs.put(topicPartition, log)
+    createCleanerManager(logs)
+  }
+
+  private def createCleanerManager(pool: Pool[TopicPartition, Log], toMock: Boolean = false): LogCleanerManager = {
+    if (toMock)
+      new LogCleanerManagerMock(Array(logDir), pool, null)
+    else
+      new LogCleanerManager(Array(logDir), pool, null)
   }
 
-  private def createLog(segmentSize: Int, cleanupPolicy: String): Log = {
+  private def createLog(segmentSize: Int, cleanupPolicy: String, segmentsCount: Int = 0): Log = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
     logProps.put(LogConfig.RetentionMsProp, 1: Integer)
     logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests
 
     val config = LogConfig(logProps)
     val partitionDir = new File(logDir, "log-0")
@@ -380,6 +511,22 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
       maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10))
+    for (i <- 0 until segmentsCount) {
+      val startOffset = i * 10
+      val endOffset = startOffset + 10
+      val segment = LogUtils.createSegment(startOffset, logDir)
+      var lastTimestamp = 0L
+      val records = (startOffset until endOffset).map { offset =>
+        val currentTimestamp = time.milliseconds()
+        if (offset == endOffset - 1)
+          lastTimestamp = currentTimestamp
+
+        new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, s"value-$offset".getBytes)
+      }
+
+      segment.append(endOffset, lastTimestamp, endOffset, MemoryRecords.withRecords(CompressionType.NONE, records:_*))
+      log.addSegment(segment)
+    }
     log
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
similarity index 91%
copy from core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
copy to core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 64e8b38..266bb39 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -35,19 +35,19 @@ import org.junit.runners.Parameterized.Parameters
 
 import scala.Seq
 import scala.collection._
-import scala.util.Random
 
 /**
  * This is an integration test that tests the fully integrated log cleaner
  */
 @RunWith(value = classOf[Parameterized])
-class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest {
+class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest {
 
-  val codec = CompressionType.forName(compressionCodec)
+  val codec: CompressionType = CompressionType.forName(compressionCodec)
   val time = new MockTime()
-  var counter = 0
+
   val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
 
+
   @Test
   def cleanerTest() {
     val largeMessageKey = 20
@@ -129,18 +129,6 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
     assertEquals("Contents of the map shouldn't change", toMap(messages), toMap(read))
   }
 
-  // returns (value, ByteBufferMessageSet)
-  private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = {
-    def messageValue(length: Int): String = {
-      val random = new Random(0)
-      new String(random.alphanumeric.take(length).toArray)
-    }
-    val value = messageValue(128)
-    val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes,
-      magicValue = messageFormatVersion)
-    (value, messageSet)
-  }
-
   @Test
   def testCleanerWithMessageFormatV0(): Unit = {
     val largeMessageKey = 20
@@ -307,22 +295,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
     }
   }
 
-  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
-    for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
-      val value = counter.toString
-      val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
-              key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
-      counter += 1
-      (key, value, appendInfo.firstOffset.get)
-    }
-  }
-
   private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
                                         startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = {
     val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val payload = counter.toString
-      counter += 1
+      incCounter()
       (key, payload)
     }
 
@@ -338,7 +315,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
 
 }
 
-object LogCleanerIntegrationTest {
+object LogCleanerParameterizedIntegrationTest {
   @Parameters
   def parameters: java.util.Collection[Array[String]] = {
     val list = new java.util.ArrayList[Array[String]]()
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 8976c68..40b6874 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -40,12 +40,7 @@ class LogSegmentTest {
                     indexIntervalBytes: Int = 10,
                     maxSegmentMs: Int = Int.MaxValue,
                     time: Time = Time.SYSTEM): LogSegment = {
-    val ms = FileRecords.open(Log.logFile(logDir, offset))
-    val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
-    val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
-    val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
-    val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs,
-      maxSegmentBytes = Int.MaxValue, time)
+    val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time)
     segments += seg
     seg
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogUtils.scala b/core/src/test/scala/unit/kafka/log/LogUtils.scala
new file mode 100644
index 0000000..eb21895
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.common.utils.Time
+
+object LogUtils {
+  /**
+    *  Create a segment with the given base offset
+    */
+  def createSegment(offset: Long,
+                    logDir: File,
+                    indexIntervalBytes: Int = 10,
+                    maxSegmentMs: Int = Int.MaxValue,
+                    time: Time = Time.SYSTEM): LogSegment = {
+    val ms = FileRecords.open(Log.logFile(logDir, offset))
+    val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
+    val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
+    val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
+
+    new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time)
+  }
+}


Mime
View raw message