kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7322; Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated
Date Tue, 18 Sep 2018 06:38:57 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d87ed23  KAFKA-7322; Fix race condition between log cleaner thread and log retention
thread when topic cleanup policy is updated
d87ed23 is described below

commit d87ed233c9e3a8faf2a8032d62ff9b90b0e40bfb
Author: Xiongqi Wesley Wu <xiongqi.wu@gmail.com>
AuthorDate: Mon Sep 17 23:36:59 2018 -0700

    KAFKA-7322; Fix race condition between log cleaner thread and log retention thread when
topic cleanup policy is updated
    
    In order to fix race condition between log cleaner thread and log retention thread when
dynamically switching topic cleanup policy, existing log cleaner in-progress map is used to
prevent more than one thread from working on the same topic partition.
    
    Author: Xiongqi Wesley Wu <xiongqi.wu@gmail.com>
    
    Reviewers: Dong Lin <lindong28@gmail.com>
    
    Closes #5591 from xiowu0/trunk
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 33 ++++++---
 .../main/scala/kafka/log/LogCleanerManager.scala   | 79 +++++++++++++++-------
 core/src/main/scala/kafka/log/LogManager.scala     | 46 +++++++++++--
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 60 ++++++++++++++--
 4 files changed, 170 insertions(+), 48 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 04b284c..f1ac1fc 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
-import scala.collection.{Set, mutable}
+import scala.collection.{Iterable, Set, mutable}
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the "compact"
retention strategy.
@@ -219,10 +219,10 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   *  Resume the cleaning of a paused partition. This call blocks until the cleaning of a
partition is resumed.
-   */
-  def resumeCleaning(topicPartition: TopicPartition) {
-    cleanerManager.resumeCleaning(topicPartition)
+    *  Resume the cleaning of paused partitions.
+    */
+  def resumeCleaning(topicPartitions: Iterable[TopicPartition]) {
+    cleanerManager.resumeCleaning(topicPartitions)
   }
 
   /**
@@ -246,6 +246,15 @@ class LogCleaner(initialConfig: CleanerConfig,
     isCleaned
   }
 
+  /**
+    * To prevent race between retention and compaction,
+    * retention threads need to make this call to obtain:
+    * @return A list of log partitions that retention threads can safely work on
+    */
+  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = {
+    cleanerManager.pauseCleaningForNonCompactedPartitions()
+  }
+
   // Only for testing
   private[kafka] def currentConfig: CleanerConfig = config
 
@@ -315,14 +324,16 @@ class LogCleaner(initialConfig: CleanerConfig,
           true
       }
       val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
-      deletable.foreach{
-        case (topicPartition, log) =>
-          try {
+
+      try {
+        deletable.foreach {
+          case (_, log) =>
             log.deleteOldSegments()
-          } finally {
-            cleanerManager.doneDeleting(topicPartition)
-          }
+        }
+      } finally {
+        cleanerManager.doneDeleting(deletable.map(_._1))
       }
+
       if (!cleaned)
         pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index ba8d7c7..83d902f 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.errors.KafkaStorageException
 
-import scala.collection.{immutable, mutable}
+import scala.collection.{Iterable, immutable, mutable}
 
 private[log] sealed trait LogCleaningState
 private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -149,6 +149,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
+    * Pause logs cleaning for logs that do not have compaction enabled
+    * and do not have other deletion or compaction in progress.
+    * This is to handle potential race between retention and cleaner threads when users
+    * switch topic configuration between compacted and non-compacted topic.
+    * @return retention logs that have log cleaning successfully paused
+    */
+  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = {
+    inLock(lock) {
+      val deletableLogs = logs.filter {
+        case (_, log) => !log.config.compact // pick non-compacted logs
+      }.filterNot {
+        case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs
already in-progress
+      }
+
+      deletableLogs.foreach {
+        case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused)
+      }
+      deletableLogs
+    }
+  }
+
+  /**
     * Find any logs that have compact and delete enabled
     */
   def deletableLogs(): Iterable[(TopicPartition, Log)] = {
@@ -170,7 +192,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   def abortCleaning(topicPartition: TopicPartition) {
     inLock(lock) {
       abortAndPauseCleaning(topicPartition)
-      resumeCleaning(topicPartition)
+      resumeCleaning(Seq(topicPartition))
     }
     info(s"The cleaning for partition $topicPartition is aborted")
   }
@@ -206,23 +228,25 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   *  Resume the cleaning of a paused partition. This call blocks until the cleaning of a
partition is resumed.
-   */
-  def resumeCleaning(topicPartition: TopicPartition) {
+    *  Resume the cleaning of paused partitions.
+    */
+  def resumeCleaning(topicPartitions: Iterable[TopicPartition]){
     inLock(lock) {
-      inProgress.get(topicPartition) match {
-        case None =>
-          throw new IllegalStateException(s"Compaction for partition $topicPartition cannot
be resumed since it is not paused.")
-        case Some(state) =>
-          state match {
-            case LogCleaningPaused =>
-              inProgress.remove(topicPartition)
-            case s =>
-              throw new IllegalStateException(s"Compaction for partition $topicPartition
cannot be resumed since it is in $s state.")
+      topicPartitions.foreach {
+        topicPartition =>
+          inProgress.get(topicPartition) match {
+            case None =>
+              throw new IllegalStateException(s"Compaction for partition $topicPartition
cannot be resumed since it is not paused.")
+            case Some(state) =>
+              state match {
+                case LogCleaningPaused =>
+                  inProgress.remove(topicPartition)
+                case s =>
+                  throw new IllegalStateException(s"Compaction for partition $topicPartition
cannot be resumed since it is in $s state.")
+              }
           }
       }
     }
-    info(s"Compaction for partition $topicPartition is resumed")
   }
 
   /**
@@ -322,18 +346,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def doneDeleting(topicPartition: TopicPartition): Unit = {
+  def doneDeleting(topicPartitions: Iterable[TopicPartition]): Unit = {
     inLock(lock) {
-      inProgress.get(topicPartition) match {
-        case Some(LogCleaningInProgress) =>
-          inProgress.remove(topicPartition)
-        case Some(LogCleaningAborted) =>
-          inProgress.put(topicPartition, LogCleaningPaused)
-          pausedCleaningCond.signalAll()
-        case None =>
-          throw new IllegalStateException(s"State for partition $topicPartition should exist.")
-        case s =>
-          throw new IllegalStateException(s"In-progress partition $topicPartition cannot
be in $s state.")
+      topicPartitions.foreach {
+        topicPartition =>
+          inProgress.get(topicPartition) match {
+            case Some(LogCleaningInProgress) =>
+              inProgress.remove(topicPartition)
+            case Some(LogCleaningAborted) =>
+              inProgress.put(topicPartition, LogCleaningPaused)
+              pausedCleaningCond.signalAll()
+            case None =>
+              throw new IllegalStateException(s"State for partition $topicPartition should
exist.")
+            case s =>
+              throw new IllegalStateException(s"In-progress partition $topicPartition cannot
be in $s state.")
+          }
       }
     }
   }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 32203ac..eab8509 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -515,8 +515,10 @@ class LogManager(logDirs: Seq[File],
           if (needToStopCleaner && !isFuture)
             cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
         } finally {
-          if (needToStopCleaner && !isFuture)
-            cleaner.resumeCleaning(topicPartition)
+          if (needToStopCleaner && !isFuture) {
+            cleaner.resumeCleaning(Seq(topicPartition))
+            info(s"Compaction for partition $topicPartition is resumed")
+          }
         }
       }
     }
@@ -547,7 +549,8 @@ class LogManager(logDirs: Seq[File],
       log.truncateFullyAndStartAt(newOffset)
       if (cleaner != null && !isFuture) {
         cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
-        cleaner.resumeCleaning(topicPartition)
+        cleaner.resumeCleaning(Seq(topicPartition))
+        info(s"Compaction for partition $topicPartition is resumed")
       }
       checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
     }
@@ -785,7 +788,8 @@ class LogManager(logDirs: Seq[File],
       currentLogs.put(topicPartition, destLog)
       if (cleaner != null) {
         cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile)
-        cleaner.resumeCleaning(topicPartition)
+        cleaner.resumeCleaning(Seq(topicPartition))
+        info(s"Compaction for partition $topicPartition is resumed")
       }
 
       try {
@@ -869,10 +873,38 @@ class LogManager(logDirs: Seq[File],
     debug("Beginning log cleanup...")
     var total = 0
     val startMs = time.milliseconds
-    for(log <- allLogs; if !log.config.compact) {
-      debug("Garbage collecting '" + log.name + "'")
-      total += log.deleteOldSegments()
+
+    // clean current logs.
+    val deletableLogs = {
+      if (cleaner != null) {
+        // prevent cleaner from working on same partitions when changing cleanup policy
+        cleaner.pauseCleaningForNonCompactedPartitions()
+      } else {
+        currentLogs.filter {
+          case (_, log) => !log.config.compact
+        }
+      }
     }
+
+    try {
+      deletableLogs.foreach {
+        case (topicPartition, log) =>
+          debug("Garbage collecting '" + log.name + "'")
+          total += log.deleteOldSegments()
+
+          val futureLog = futureLogs.get(topicPartition)
+          if (futureLog != null) {
+            // clean future logs
+            debug("Garbage collecting future log '" + futureLog.name + "'")
+            total += futureLog.deleteOldSegments()
+          }
+      }
+    } finally {
+      if (cleaner != null) {
+        cleaner.resumeCleaning(deletableLogs.map(_._1))
+      }
+    }
+
     debug("Log cleanup completed. " + total + " files deleted in " +
                   (time.milliseconds - startMs) / 1000 + " seconds")
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 7455763..8cb2f9e 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -91,6 +91,58 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   }
 
   /**
+    * log with retention in progress should not be picked up for compaction and vice versa
when log cleanup policy
+    * is changed between "compact" and "delete"
+    */
+  @Test
+  def testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa(): Unit =
{
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    log.appendAsLeader(records, leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(records, leaderEpoch = 0)
+    log.onHighWatermarkIncremented(2L)
+
+    // simulate retention thread working on the log partition
+    val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions()
+    assertEquals("should have 1 logs ready to be deleted", 1, deletableLog.size)
+
+    // change cleanup policy from delete to compact
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, log.config.segmentSize)
+    logProps.put(LogConfig.RetentionMsProp, log.config.retentionMs)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0: Integer)
+    val config = LogConfig(logProps)
+    log.config = config
+
+    // log retention inprogress, the log is not available for compaction
+    val cleanable = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals("should have 0 logs ready to be compacted", 0, cleanable.size)
+
+    // log retention finished, and log can be picked up for compaction
+    cleanerManager.resumeCleaning(deletableLog.map(_._1))
+    val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals("should have 1 logs ready to be compacted", 1, cleanable2.size)
+
+    // update cleanup policy to delete
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete)
+    val config2 = LogConfig(logProps)
+    log.config = config2
+
+    // compaction in progress, should have 0 log eligible for log retention
+    val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions()
+    assertEquals("should have 0 logs ready to be deleted", 0, deletableLog2.size)
+
+    // compaction done, should have 1 log eligible for log retention
+    cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition))
+    val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
+    assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
+  }
+
+  /**
     * Test computation of cleanable range with no minimum compaction lag settings active
     */
   @Test
@@ -250,17 +302,17 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val tp = new TopicPartition("log", 0)
 
-    intercept[IllegalStateException](cleanerManager.doneDeleting(tp))
+    intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
 
     cleanerManager.setCleaningState(tp, LogCleaningPaused)
-    intercept[IllegalStateException](cleanerManager.doneDeleting(tp))
+    intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
 
     cleanerManager.setCleaningState(tp, LogCleaningInProgress)
-    cleanerManager.doneDeleting(tp)
+    cleanerManager.doneDeleting(Seq(tp))
     assertTrue(cleanerManager.cleaningState(tp).isEmpty)
 
     cleanerManager.setCleaningState(tp, LogCleaningAborted)
-    cleanerManager.doneDeleting(tp)
+    cleanerManager.doneDeleting(Seq(tp))
     assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
 
   }


Mime
View raw message