kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10063; UnsupportedOperation when querying cleaner metrics after shutdown (#8783)
Date Mon, 08 Jun 2020 23:56:31 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 534b68c  KAFKA-10063; UnsupportedOperation when querying cleaner metrics after shutdown
(#8783)
534b68c is described below

commit 534b68cbbf73b37d43a8136e89ab036fcdbb2e97
Author: Chia-Ping Tsai <chia7712@gmail.com>
AuthorDate: Tue Jun 9 07:47:22 2020 +0800

    KAFKA-10063; UnsupportedOperation when querying cleaner metrics after shutdown (#8783)
    
    Some `LogCleaner` metrics have an unsafe call to `max` over the collection of cleaner
threads, which could be empty after shutdown. This leads to an `UnsupportedOperationException`.
This patch fixes the problem by changing the computation to use `foldLeft`.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 17 ++++++++++++++---
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 22 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 2ce1a85..c83cc6f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -112,9 +112,18 @@ class LogCleaner(initialConfig: CleanerConfig,
 
   private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()
 
+  /**
+   * scala 2.12 does not support maxOption so we handle the empty manually.
+   * @param f to compute the result
+   * @return the max value (int value) or 0 if there is no cleaner
+   */
+  private def maxOverCleanerThreads(f: CleanerThread => Double): Int =
+    cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt
+
+
   /* a metric to track the maximum utilization of any thread's buffer in the last cleaning
*/
   newGauge("max-buffer-utilization-percent",
-    () => cleaners.iterator.map(100 * _.lastStats.bufferUtilization).max.toInt)
+    () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
 
   /* a metric to track the recopy rate of each thread's last cleaning */
   newGauge("cleaner-recopy-percent", () => {
@@ -124,12 +133,14 @@ class LogCleaner(initialConfig: CleanerConfig,
   })
 
   /* a metric to track the maximum cleaning time for the last cleaning from each thread */
-  newGauge("max-clean-time-secs", () => cleaners.iterator.map(_.lastStats.elapsedSecs).max.toInt)
+  newGauge("max-clean-time-secs",
+    () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
+
 
   // a metric to track delay between the time when a log is required to be compacted
   // as determined by max compaction lag and the time of last cleaner run.
   newGauge("max-compaction-delay-secs",
-    () => Math.max(0, (cleaners.iterator.map(_.lastPreCleanStats.maxCompactionDelayMs).max
/ 1000).toInt))
+    () => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
 
   newGauge("DeadThreadCount", () => deadThreadCount)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 770db06..a2b4b08 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1603,6 +1603,28 @@ class LogCleanerTest {
     }
   }
 
+  @Test
+  def testMaxCleanTimeSecs(): Unit = {
+    val logCleaner = new LogCleaner(new CleanerConfig,
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, Log](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    def checkGauge(name: String): Unit = {
+      val gauge = logCleaner.newGauge(name, () => 999)
+      // if there is no cleaners, 0 is default value
+      assertEquals(0, gauge.value())
+    }
+
+    try {
+      checkGauge("max-buffer-utilization-percent")
+      checkGauge("max-clean-time-secs")
+      checkGauge("max-compaction-delay-secs")
+    } finally logCleaner.shutdown()
+  }
+
+
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]):
Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
       yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset


Mime
View raw message