kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1327 Add log cleaner metrics.
Date Mon, 21 Apr 2014 18:12:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.1 4bcb22f47 -> 69fbdf9cb


KAFKA-1327 Add log cleaner metrics.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69fbdf9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69fbdf9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69fbdf9c

Branch: refs/heads/0.8.1
Commit: 69fbdf9cb39895198c14f00b483a3a3f47936b78
Parents: 4bcb22f
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Thu Apr 17 15:28:02 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Apr 18 16:28:16 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |  3 ++
 core/src/main/scala/kafka/log/LogCleaner.scala  | 43 ++++++++++++++++++--
 .../scala/kafka/log/LogCleanerManager.scala     | 27 ++++++++----
 core/src/main/scala/kafka/utils/Throttler.scala | 12 ++++--
 4 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69fbdf9c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b3ab522..46df8d9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -75,6 +75,9 @@ class Log(val dir: File,
 
   newGauge(name + "-" + "LogEndOffset",
            new Gauge[Long] { def value = logEndOffset })
+           
+  newGauge(name + "-" + "Size", 
+           new Gauge[Long] {def value = size})
 
   /** The name of this log */
   def name  = dir.getName()

http://git-wip-us.apache.org/repos/asf/kafka/blob/69fbdf9c/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 312204c..b9ffe00 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -19,12 +19,15 @@ package kafka.log
 
 import scala.collection._
 import scala.math
+import java.util.concurrent.TimeUnit
 import java.nio._
 import java.util.Date
 import java.io.File
 import kafka.common._
 import kafka.message._
 import kafka.utils._
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 import java.lang.IllegalStateException
 
 /**
@@ -63,7 +66,8 @@ import java.lang.IllegalStateException
 class LogCleaner(val config: CleanerConfig,
                  val logDirs: Array[File],
                  val logs: Pool[TopicAndPartition, Log], 
-                 time: Time = SystemTime) extends Logging {
+                 time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
+  
   /* for managing the state of partitions being cleaned. */
   private val cleanerManager = new LogCleanerManager(logDirs, logs);
 
@@ -71,11 +75,33 @@ class LogCleaner(val config: CleanerConfig,
   private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, 
                                         checkIntervalMs = 300, 
                                         throttleDown = true, 
+                                        "cleaner-io",
+                                        "bytes",
                                         time = time)
   
   /* the threads */
   private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
   
+  /* a metric to track the maximum utilization of any thread's buffer in the last cleaning
*/
+  newGauge("max-buffer-utilization-percent", 
+           new Gauge[Int] {
+             def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt
+           })
+  /* a metric to track the recopy rate of each thread's last cleaning */
+  newGauge("cleaner-recopy-percent", 
+           new Gauge[Int] {
+             def value: Int = {
+               val stats = cleaners.map(_.lastStats)
+               val recopyRate = stats.map(_.bytesWritten).sum.toDouble / math.max(stats.map(_.bytesRead).sum,
1)
+               (100 * recopyRate).toInt
+             }
+           })
+  /* a metric to track the maximum cleaning time for the last cleaning from each thread */
+  newGauge("max-clean-time-secs",
+           new Gauge[Int] {
+             def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
+           })
+  
   /**
    * Start the background cleaning
    */
@@ -147,6 +173,8 @@ class LogCleaner(val config: CleanerConfig,
                               time = time,
                               checkDone = checkDone)
     
+    @volatile var lastStats: CleanerStats = new CleanerStats()
+    
     private def checkDone(topicAndPartition: TopicAndPartition) {
       if (!isRunning.get())
         throw new ThreadShutdownException
@@ -173,7 +201,7 @@ class LogCleaner(val config: CleanerConfig,
           var endOffset = cleanable.firstDirtyOffset
           try {
             endOffset = cleaner.clean(cleanable)
-            logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset,
cleaner.stats)
+            recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset,
cleaner.stats)
           } catch {
             case pe: LogCleaningAbortedException => // task can be aborted, let it go.
           } finally {
@@ -185,7 +213,8 @@ class LogCleaner(val config: CleanerConfig,
     /**
      * Log out statistics on a single run of the cleaner.
      */
-    def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
+    def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
+      this.lastStats = stats
       def mb(bytes: Double) = bytes / (1024*1024)
       val message = 
         "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id,
name, from, to) + 
@@ -196,6 +225,7 @@ class LogCleaner(val config: CleanerConfig,
                                                                                         
  stats.elapsedIndexSecs, 
                                                                                         
  mb(stats.mapBytesRead)/stats.elapsedIndexSecs, 
                                                                                         
  100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) + 
+        "\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) +
         "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead),

                                                                                         
  stats.elapsedSecs - stats.elapsedIndexSecs, 
                                                                                         
  mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs
- stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) + 
@@ -218,7 +248,7 @@ class LogCleaner(val config: CleanerConfig,
  * @param time The time instance
  */
 private[log] class Cleaner(val id: Int,
-                           offsetMap: OffsetMap,
+                           val offsetMap: OffsetMap,
                            ioBufferSize: Int,
                            maxIoBufferSize: Int,
                            dupBufferLoadFactor: Double,
@@ -269,6 +299,9 @@ private[log] class Cleaner(val id: Int,
     info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
     for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
log.config.maxIndexSize))
       cleanSegments(log, group, offsetMap, deleteHorizonMs)
+      
+    // record buffer utilization
+    stats.bufferUtilization = offsetMap.utilization
     
     stats.allDone()
     endOffset
@@ -504,6 +537,7 @@ private[log] class Cleaner(val id: Int,
  */
 private case class CleanerStats(time: Time = SystemTime) {
   var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead,
messagesRead, messagesWritten = 0L
+  var bufferUtilization = 0.0d
   clear()
   
   def readMessage(size: Int) {
@@ -543,6 +577,7 @@ private case class CleanerStats(time: Time = SystemTime) {
     mapMessagesRead = 0L
     messagesRead = 0L
     messagesWritten = 0L
+    bufferUtilization = 0.0d
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69fbdf9c/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 79e9d55..683d722 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -18,6 +18,8 @@
 package kafka.log
 
 import java.io.File
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 import kafka.utils.{Logging, Pool}
 import kafka.server.OffsetCheckpoint
 import collection.mutable
@@ -39,7 +41,7 @@ private[log] case object LogCleaningPaused extends LogCleaningState
  *  While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning
again, until cleaning is
  *  requested to be resumed.
  */
-private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition,
Log]) extends Logging {
+private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition,
Log]) extends Logging with KafkaMetricsGroup {
   
   override val loggerName = classOf[LogCleaner].getName
   
@@ -51,8 +53,13 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
 
   /* a global lock used to control all access to the in-progress set and the offset checkpoints
*/
   private val lock = new ReentrantLock
+  
   /* for coordinating the pausing and the cleaning of a partition */
   private val pausedCleaningCond = lock.newCondition()
+  
+  /* a gauge for tracking the cleanable ratio of the dirtiest log */
+  private var dirtiestLogCleanableRatio = 0.0
+  newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt
})
 
   /**
    * @return the position processed for all logs.
@@ -68,16 +75,18 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
   def grabFilthiestLog(): Option[LogToClean] = {
     inLock(lock) {
       val lastClean = allCleanerCheckpoints()
-      val cleanableLogs = logs.filter(l => l._2.config.compact)          // skip any logs
marked for delete rather than dedupe
-                              .filterNot(l => inProgress.contains(l._1)) // skip any logs
already in-progress
-                              .map(l => LogToClean(l._1, l._2,           // create a LogToClean
instance for each
-                                                   lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
-      val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0)        // must have
some bytes
-                                   .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio)
// and must meet the minimum threshold for dirty byte ratio
-      if(dirtyLogs.isEmpty) {
+      val dirtyLogs = logs.filter(l => l._2.config.compact)          // skip any logs
marked for delete rather than dedupe
+                          .filterNot(l => inProgress.contains(l._1)) // skip any logs
already in-progress
+                          .map(l => LogToClean(l._1, l._2,           // create a LogToClean
instance for each
+                                               lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
+                          .filter(l => l.totalBytes > 0)                          
        // skip any empty logs
+      if(!dirtyLogs.isEmpty)
+        this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio
+      val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio)
// and must meet the minimum threshold for dirty byte ratio
+      if(cleanableLogs.isEmpty) {
         None
       } else {
-        val filthiest = dirtyLogs.max
+        val filthiest = cleanableLogs.max
         inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
         Some(filthiest)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69fbdf9c/core/src/main/scala/kafka/utils/Throttler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index c6c3c75..d1a144d 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -17,6 +17,8 @@
 
 package kafka.utils;
 
+import kafka.metrics.KafkaMetricsGroup
+import java.util.concurrent.TimeUnit
 import java.util.Random
 import scala.math._
 
@@ -33,14 +35,18 @@ import scala.math._
 @threadsafe
 class Throttler(val desiredRatePerSec: Double, 
                 val checkIntervalMs: Long = 100L, 
-                val throttleDown: Boolean = true, 
-                val time: Time = SystemTime) extends Logging {
+                val throttleDown: Boolean = true,
+                metricName: String = "throttler",
+                units: String = "entries",
+                val time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
   
   private val lock = new Object
+  private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
   private var periodStartNs: Long = time.nanoseconds
   private var observedSoFar: Double = 0.0
   
   def maybeThrottle(observed: Double) {
+    meter.mark(observed.toLong)
     lock synchronized {
       observedSoFar += observed
       val now = time.nanoseconds
@@ -72,7 +78,7 @@ object Throttler {
   
   def main(args: Array[String]) {
     val rand = new Random()
-    val throttler = new Throttler(100000, 100, true, SystemTime)
+    val throttler = new Throttler(100000, 100, true, time = SystemTime)
     val interval = 30000
     var start = System.currentTimeMillis
     var total = 0


Mime
View raw message