kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8526; Fallback to other log dirs after getOrCreateLog failure (#6969)
Date Tue, 23 Jul 2019 15:58:04 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b6a6b9  KAFKA-8526; Fallback to other log dirs after getOrCreateLog failure (#6969)
6b6a6b9 is described below

commit 6b6a6b930fda853fd91dfbe85b4462e17654f804
Author: Igor Soarez <i@soarez.me>
AuthorDate: Tue Jul 23 16:57:38 2019 +0100

    KAFKA-8526; Fallback to other log dirs after getOrCreateLog failure (#6969)
    
     LogManager#getOrCreateLog() selects a log dir for the new replica from
     _liveLogDirs, if disk failure is discovered at this point, before
     LogDirFailureHandler finds out, try using other log dirs before failing
     the operation.
    
    Reviewers: Anna Povzner <anna@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 109 ++++++++++++---------
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  50 +++++++++-
 2 files changed, 112 insertions(+), 47 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 320f346..6c724c3 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundExce
 import scala.collection.JavaConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
 
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible
for log creation, retrieval, and cleaning.
@@ -676,7 +677,7 @@ class LogManager(logDirs: Seq[File],
         if (!isNew && offlineLogDirs.nonEmpty)
           throw new KafkaStorageException(s"Can not create log for $topicPartition because
log directories ${offlineLogDirs.mkString(",")} are offline")
 
-        val logDir = {
+        val logDirs: List[File] = {
           val preferredLogDir = preferredLogDirs.get(topicPartition)
 
           if (isFuture) {
@@ -687,55 +688,70 @@ class LogManager(logDirs: Seq[File],
           }
 
           if (preferredLogDir != null)
-            preferredLogDir
+            List(new File(preferredLogDir))
           else
-            nextLogDir().getAbsolutePath
+            nextLogDirs()
         }
-        if (!isLogDirOnline(logDir))
-          throw new KafkaStorageException(s"Can not create log for $topicPartition because
log directory $logDir is offline")
-
-        try {
-          val dir = {
-            if (isFuture)
-              new File(logDir, Log.logFutureDirName(topicPartition))
-            else
-              new File(logDir, Log.logDirName(topicPartition))
-          }
-          Files.createDirectories(dir.toPath)
-
-          val log = Log(
-            dir = dir,
-            config = config,
-            logStartOffset = 0L,
-            recoveryPoint = 0L,
-            maxProducerIdExpirationMs = maxPidExpirationMs,
-            producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
-            scheduler = scheduler,
-            time = time,
-            brokerTopicStats = brokerTopicStats,
-            logDirFailureChannel = logDirFailureChannel)
 
+        val logDirName = {
           if (isFuture)
-            futureLogs.put(topicPartition, log)
+            Log.logFutureDirName(topicPartition)
           else
-            currentLogs.put(topicPartition, log)
+            Log.logDirName(topicPartition)
+        }
 
-          info(s"Created log for partition $topicPartition in $logDir with properties " +
-            s"{${config.originals.asScala.mkString(", ")}}.")
-          // Remove the preferred log dir since it has already been satisfied
-          preferredLogDirs.remove(topicPartition)
+        val logDir = logDirs
+          .toStream // to prevent actually mapping the whole list, lazy map
+          .map(createLogDirectory(_, logDirName))
+          .find(_.isSuccess)
+          .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried
" + logDirs.map(_.getAbsolutePath).mkString(", "))))
+          .get // If Failure, will throw
+
+        val log = Log(
+          dir = logDir,
+          config = config,
+          logStartOffset = 0L,
+          recoveryPoint = 0L,
+          maxProducerIdExpirationMs = maxPidExpirationMs,
+          producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+          scheduler = scheduler,
+          time = time,
+          brokerTopicStats = brokerTopicStats,
+          logDirFailureChannel = logDirFailureChannel)
 
-          log
-        } catch {
-          case e: IOException =>
-            val msg = s"Error while creating log for $topicPartition in dir $logDir"
-            logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
-            throw new KafkaStorageException(msg, e)
-        }
+        if (isFuture)
+          futureLogs.put(topicPartition, log)
+        else
+          currentLogs.put(topicPartition, log)
+
+        info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(",
")}}.")
+        // Remove the preferred log dir since it has already been satisfied
+        preferredLogDirs.remove(topicPartition)
+
+        log
       }
     }
   }
 
+  private[log] def createLogDirectory(logDir: File, logDirName: String): Try[File] = {
+    val logDirPath = logDir.getAbsolutePath
+    if (isLogDirOnline(logDirPath)) {
+      val dir = new File(logDirPath, logDirName)
+      try {
+        Files.createDirectories(dir.toPath)
+        Success(dir)
+      } catch {
+        case e: IOException =>
+          val msg = s"Error while creating log for $logDirName in dir $logDirPath"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDirPath, msg, e)
+          warn(msg, e)
+          Failure(new KafkaStorageException(msg, e))
+      }
+    } else {
+      Failure(new KafkaStorageException(s"Can not create log $logDirName because log directory
$logDirPath is offline"))
+    }
+  }
+
   /**
    *  Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
    *  has elapsed after the delete was scheduled. Logs for which this interval has not yet
elapsed will be
@@ -869,13 +885,13 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
-   * Choose the next directory in which to create a log. Currently this is done
-   * by calculating the number of partitions in each directory and then choosing the
-   * data directory with the fewest partitions.
+   * Provides the full ordered list of suggested directories for the next partition.
+   * Currently this is done by calculating the number of partitions in each directory and
then sorting the
+   * data directories by fewest partitions.
    */
-  private def nextLogDir(): File = {
+  private def nextLogDirs(): List[File] = {
     if(_liveLogDirs.size == 1) {
-      _liveLogDirs.peek()
+      List(_liveLogDirs.peek())
     } else {
       // count the number of logs in each parent directory (including 0 for empty directories
       val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
@@ -883,8 +899,9 @@ class LogManager(logDirs: Seq[File],
       val dirCounts = (zeros ++ logCounts).toBuffer
 
       // choose the directory with the least logs in it
-      val leastLoaded = dirCounts.sortBy(_._2).head
-      new File(leastLoaded._1)
+      dirCounts.sortBy(_._2).map {
+        case (path: String, _: Int) => new File(path)
+      }.toList
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 1e6d2dc..bfbd423 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -23,11 +23,18 @@ import java.util.{Collections, Properties}
 import kafka.server.FetchDataInfo
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{doAnswer, spy}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import scala.collection.mutable
+import scala.util.{Failure, Try}
 
 class LogManagerTest {
 
@@ -95,6 +102,47 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
+  @Test
+  def testCreateLogWithLogDirFallback() {
+    // Configure a number of directories one level deeper in logDir,
+    // so they all get cleaned up in tearDown().
+    val dirs = (0 to 4)
+      .map(_.toString)
+      .map(logDir.toPath.resolve(_).toFile)
+
+    // Create a new LogManager with the configured directories and an overridden createLogDirectory.
+    logManager.shutdown()
+    logManager = spy(createLogManager(dirs))
+    val brokenDirs = mutable.Set[File]()
+    doAnswer(new Answer[Try[File]] {
+      override def answer(invocation: InvocationOnMock): Try[File] = {
+        // The first half of directories tried will fail, the rest goes through.
+        val logDir = invocation.getArgument[File](0)
+        if (brokenDirs.contains(logDir) || brokenDirs.size < dirs.length / 2) {
+          brokenDirs.add(logDir)
+          Failure(new Throwable("broken dir"))
+        } else {
+          invocation.callRealMethod().asInstanceOf[Try[File]]
+        }
+      }
+    }).when(logManager).createLogDirectory(any(), any())
+    logManager.startup()
+
+    // Request creating a new log.
+    // LogManager should try using all configured log directories until one succeeds.
+    logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig, isNew = true)
+
+    // Verify that half the directories were considered broken,
+    assertEquals(dirs.length / 2, brokenDirs.size)
+
+    // and that exactly one log file was created,
+    val containsLogFile: File => Boolean = dir => new File(dir, name + "-0").exists()
+    assertEquals("More than one log file created", 1, dirs.count(containsLogFile))
+
+    // and that it wasn't created in one of the broken directories.
+    assertFalse(brokenDirs.exists(containsLogFile))
+  }
+
   /**
    * Test that get on a non-existent returns None and no log is created.
    */


Mime
View raw message