kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] 01/02: KAFKA-8725; Improve LogCleanerManager#grabFilthiestLog error handling (#7475)
Date Tue, 15 Oct 2019 18:30:13 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 3cf2b7715a5dd1a617234b1bd11b92cc0f46006f
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Fri Oct 11 23:10:29 2019 +0100

    KAFKA-8725; Improve LogCleanerManager#grabFilthiestLog error handling (#7475)
    
    KAFKA-7215 improved the log cleaner error handling to mitigate thread death but missed
one case. Exceptions in grabFilthiestCompactedLog still cause the thread to die.
    
    This patch improves handling to ensure that errors in that function still mark a partition
as uncleanable and do not crash the thread.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 76 ++++++++++++----------
 .../main/scala/kafka/log/LogCleanerManager.scala   | 21 ++++--
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 54 +++++++++++++--
 3 files changed, 103 insertions(+), 48 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index e37eacf..bcb0e72 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -304,53 +304,59 @@ class LogCleaner(initialConfig: CleanerConfig,
      * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
     override def doWork(): Unit = {
-      val cleaned = cleanFilthiestLog()
+      val cleaned = tryCleanFilthiestLog()
       if (!cleaned)
         pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
 
     /**
-      * Cleans a log if there is a dirty log available
-      * @return whether a log was cleaned
-      */
-    private def cleanFilthiestLog(): Boolean = {
-      var currentLog: Option[Log] = None
-
+     * Cleans a log if there is a dirty log available
+     * @return whether a log was cleaned
+     */
+    private def tryCleanFilthiestLog(): Boolean = {
       try {
-        val preCleanStats = new PreCleanStats()
-        val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match
{
-          case None =>
-            false
-          case Some(cleanable) =>
-            this.lastPreCleanStats = preCleanStats
-            // there's a log, clean it
-            currentLog = Some(cleanable.log)
+        cleanFilthiestLog()
+      } catch {
+        case e: LogCleaningException =>
+          warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition
(${e.log.topicPartition}) as uncleanable", e)
+          cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition)
+
+          false
+      }
+    }
+
+    @throws(classOf[LogCleaningException])
+    private def cleanFilthiestLog(): Boolean = {
+      val preCleanStats = new PreCleanStats()
+      val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
+        case None =>
+          false
+        case Some(cleanable) =>
+          // there's a log, clean it
+          this.lastPreCleanStats = preCleanStats
+          try {
             cleanLog(cleanable)
             true
-        }
-        val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
-        try {
-          deletable.foreach { case (_, log) =>
-            currentLog = Some(log)
+          } catch {
+            case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
+            case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage,
e)
+          }
+      }
+      val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
+      try {
+        deletable.foreach { case (_, log) =>
+          try {
             log.deleteOldSegments()
+          } catch {
+            case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
+            case e: Exception => throw new LogCleaningException(log, e.getMessage, e)
           }
-        } finally  {
-          cleanerManager.doneDeleting(deletable.map(_._1))
         }
-
-        cleaned
-      } catch {
-        case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
-        case e: Exception =>
-          if (currentLog.isEmpty) {
-            throw new IllegalStateException("currentLog cannot be empty on an unexpected
exception", e)
-          }
-          val erroneousLog = currentLog.get
-          warn(s"Unexpected exception thrown when cleaning log $erroneousLog. Marking its
partition (${erroneousLog.topicPartition}) as uncleanable", e)
-          cleanerManager.markPartitionUncleanable(erroneousLog.dir.getParent, erroneousLog.topicPartition)
-
-          false
+      } finally  {
+        cleanerManager.doneDeleting(deletable.map(_._1))
       }
+
+      cleaned
     }
 
     private def cleanLog(cleanable: LogToClean): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index bde30b0..a5cfed5 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
-import kafka.common.LogCleaningAbortedException
+import kafka.common.{KafkaException, LogCleaningAbortedException}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.LogDirFailureChannel
 import kafka.server.checkpoints.OffsetCheckpointFile
@@ -39,6 +39,10 @@ private[log] case object LogCleaningInProgress extends LogCleaningState
 private[log] case object LogCleaningAborted extends LogCleaningState
 private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState
 
+private[log] class LogCleaningException(val log: Log,
+                                        private val message: String,
+                                        private val cause: Throwable) extends KafkaException(message,
cause)
+
 /**
   * This class manages the state of each partition being cleaned.
   * LogCleaningState defines the cleaning states that a TopicPartition can be in.
@@ -180,11 +184,16 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
           inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
       }.map {
         case (topicPartition, log) => // create a LogToClean instance for each
-          val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition,
lastClean, now)
-          val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
-          preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
-
-          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset,
compactionDelayMs > 0)
+          try {
+            val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition,
lastClean, now)
+            val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
+            preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
+
+            LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset,
compactionDelayMs > 0)
+          } catch {
+            case e: Throwable => throw new LogCleaningException(log,
+              s"Failed to calculate log cleaning stats for partition $topicPartition", e)
+          }
       }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
 
       this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio
else 0
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 6a74874..db86c75 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -78,6 +78,42 @@ class LogCleanerManagerTest extends Logging {
   }
 
   @Test
+  def testGrabFilthiestCompactedLogThrowsException(): Unit = {
+    val tp = new TopicPartition("A", 1)
+    val logSegmentSize = TestUtils.singletonRecords("test".getBytes).sizeInBytes * 10
+    val logSegmentsCount = 2
+    val tpDir = new File(logDir, "A-1")
+
+    // the exception should be catched and the partition that caused it marked as uncleanable
+    class LogMock(dir: File, config: LogConfig) extends Log(dir, config, 0L, 0L,
+      time.scheduler, new BrokerTopicStats, time, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs,
+      topicPartition, new ProducerStateManager(tp, tpDir, 60 * 60 * 1000), new LogDirFailureChannel(10))
{
+
+      // Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog()
+      override def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long]
=
+        throw new IllegalStateException("Error!")
+    }
+
+    val log: Log = new LogMock(tpDir, createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact))
+    writeRecords(log = log,
+      numBatches = logSegmentsCount * 2,
+      recordsPerBatch = 10,
+      batchesPerSegment = 2
+    )
+
+    val logsPool = new Pool[TopicPartition, Log]()
+    logsPool.put(tp, log)
+    val cleanerManager = createCleanerManagerMock(logsPool)
+    cleanerCheckpoints.put(tp, 1)
+
+    val thrownException = intercept[LogCleaningException] {
+      cleanerManager.grabFilthiestCompactedLog(time).get
+    }
+    assertEquals(log, thrownException.log)
+    assertTrue(thrownException.getCause.isInstanceOf[IllegalStateException])
+  }
+
+  @Test
   def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = {
     val tp0 = new TopicPartition("wishing-well", 0)
     val tp1 = new TopicPartition("wishing-well", 1)
@@ -505,13 +541,7 @@ class LogCleanerManagerTest extends Logging {
   private def createLog(segmentSize: Int,
                         cleanupPolicy: String,
                         topicPartition: TopicPartition = new TopicPartition("log", 0)): Log
= {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
-    logProps.put(LogConfig.RetentionMsProp, 1: Integer)
-    logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
-    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for
easier and clearer tests
-
-    val config = LogConfig(logProps)
+    val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy)
     val partitionDir = new File(logDir, Log.logDirName(topicPartition))
 
     Log(partitionDir,
@@ -526,6 +556,16 @@ class LogCleanerManagerTest extends Logging {
       logDirFailureChannel = new LogDirFailureChannel(10))
   }
 
+  private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig
= {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
+    logProps.put(LogConfig.RetentionMsProp, 1: Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for
easier and clearer tests
+
+    LogConfig(logProps)
+  }
+
   private def writeRecords(log: Log,
                            numBatches: Int,
                            recordsPerBatch: Int,


Mime
View raw message