kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.6 updated: MINOR: Improve logging around initial log loading (#8970)
Date Mon, 06 Jul 2020 18:05:59 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new b3b0510  MINOR: Improve logging around initial log loading (#8970)
b3b0510 is described below

commit b3b0510a1a035d6a6024fb741a3ce5fe09b6783b
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Jul 6 10:30:16 2020 -0700

    MINOR: Improve logging around initial log loading (#8970)
    
    Users often get confused after an unclean shutdown when log recovery takes a long time.
This patch attempts to make the logging clearer and provide a simple indication of loading
progress.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 core/src/main/scala/kafka/log/Log.scala        |  5 ---
 core/src/main/scala/kafka/log/LogManager.scala | 55 +++++++++++++++++---------
 2 files changed, 36 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d38daf8a..f3b2dc4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -279,8 +279,6 @@ class Log(@volatile private var _dir: File,
   @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
 
   locally {
-    val startMs = time.milliseconds
-
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
 
@@ -303,9 +301,6 @@ class Log(@volatile private var _dir: File,
     if (!producerStateManager.isEmpty)
       throw new IllegalStateException("Producer state must be empty during log initialization")
     loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
-
-    info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset
and " +
-      s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
   }
 
   def dir: File = _dir
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 8dc6370..6e078ef 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -20,6 +20,7 @@ package kafka.log
 import java.io._
 import java.nio.file.Files
 import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.OffsetCheckpointFile
@@ -252,8 +253,9 @@ class LogManager(logDirs: Seq[File],
   // Only for testing
   private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
 
-  private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets:
Map[TopicPartition, Long]): Unit = {
-    debug(s"Loading log '${logDir.getName}'")
+  private def loadLog(logDir: File,
+                      recoveryPoints: Map[TopicPartition, Long],
+                      logStartOffsets: Map[TopicPartition, Long]): Log = {
     val topicPartition = Log.parseTopicPartitionName(logDir)
     val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig)
     val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
@@ -290,29 +292,33 @@ class LogManager(logDirs: Seq[File],
             s"for this partition. It is recommended to delete the partition in the log directory
that is known to have failed recently.")
       }
     }
+
+    log
   }
 
   /**
    * Recover and load all logs in the given data directories
    */
   private def loadLogs(): Unit = {
-    info("Loading logs.")
-    val startMs = time.milliseconds
+    info(s"Loading logs from log dirs $liveLogDirs")
+    val startMs = time.hiResClockMs()
     val threadPools = ArrayBuffer.empty[ExecutorService]
     val offlineDirs = mutable.Set.empty[(String, IOException)]
     val jobs = mutable.Map.empty[File, Seq[Future[_]]]
+    var numTotalLogs = 0
 
     for (dir <- liveLogDirs) {
+      val logDirAbsolutePath = dir.getAbsolutePath
       try {
         val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
         threadPools.append(pool)
 
         val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
-
         if (cleanShutdownFile.exists) {
-          debug(s"Found clean shutdown file. Skipping recovery for all logs in data directory:
${dir.getAbsolutePath}")
+          info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown
file was found")
         } else {
           // log recovery itself is being performed by `Log` class during initialization
+          info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown
file was found")
           brokerState.newState(RecoveringFromUncleanShutdown)
         }
 
@@ -321,8 +327,8 @@ class LogManager(logDirs: Seq[File],
           recoveryPoints = this.recoveryPointCheckpoints(dir).read
         } catch {
           case e: Exception =>
-            warn(s"Error occurred while reading recovery-point-offset-checkpoint file of
directory $dir", e)
-            warn("Resetting the recovery checkpoint to 0")
+            warn(s"Error occurred while reading recovery-point-offset-checkpoint file of
directory " +
+              s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
         }
 
         var logStartOffsets = Map[TopicPartition, Long]()
@@ -330,29 +336,40 @@ class LogManager(logDirs: Seq[File],
           logStartOffsets = this.logStartOffsetCheckpoints(dir).read
         } catch {
           case e: Exception =>
-            warn(s"Error occurred while reading log-start-offset-checkpoint file of directory
$dir", e)
+            warn(s"Error occurred while reading log-start-offset-checkpoint file of directory
" +
+              s"$logDirAbsolutePath, resetting to the base offset of the first segment",
e)
         }
 
-        val jobsForDir = for {
-          dirContent <- Option(dir.listFiles).toList
-          logDir <- dirContent if logDir.isDirectory
-        } yield {
+        val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory)
+        val numLogsLoaded = new AtomicInteger(0)
+        numTotalLogs += logsToLoad.length
+
+        val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
             try {
-              loadLog(logDir, recoveryPoints, logStartOffsets)
+              debug(s"Loading log $logDir")
+
+              val logLoadStartMs = time.hiResClockMs()
+              val log = loadLog(logDir, recoveryPoints, logStartOffsets)
+              val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
+              val currentNumLoaded = numLogsLoaded.incrementAndGet()
+
+              info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms
" +
+                s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
             } catch {
               case e: IOException =>
-                offlineDirs.add((dir.getAbsolutePath, e))
-                error(s"Error while loading log dir ${dir.getAbsolutePath}", e)
+                offlineDirs.add((logDirAbsolutePath, e))
+                error(s"Error while loading log dir $logDirAbsolutePath", e)
             }
           }
           runnable
         }
+
         jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
       } catch {
         case e: IOException =>
-          offlineDirs.add((dir.getAbsolutePath, e))
-          error(s"Error while loading log dir ${dir.getAbsolutePath}", e)
+          offlineDirs.add((logDirAbsolutePath, e))
+          error(s"Error while loading log dir $logDirAbsolutePath", e)
       }
     }
 
@@ -379,7 +396,7 @@ class LogManager(logDirs: Seq[File],
       threadPools.foreach(_.shutdown())
     }
 
-    info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")
+    info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
   }
 
   /**


Mime
View raw message