kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-7981; Add fetcher and log cleaner thread count metrics (#6514)
Date Thu, 17 Oct 2019 01:05:33 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 0299993  KAFKA-7981; Add fetcher and log cleaner thread count metrics (#6514)
0299993 is described below

commit 0299993c09ac4da2f43e58f28a60ec1fd1937a16
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
AuthorDate: Fri Oct 11 19:53:32 2019 +0200

    KAFKA-7981; Add fetcher and log cleaner thread count metrics (#6514)
    
    This patch adds metrics for failed threads as documented in KIP-434: https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics.
    
    Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson
<jason@confluent.io>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 12 +++-
 .../kafka/server/AbstractFetcherManager.scala      | 10 ++++
 .../scala/kafka/utils/ShutdownableThread.scala     | 19 ++++---
 .../unit/kafka/log/LogCleanerIntegrationTest.scala | 64 ++++++++++++++++------
 .../kafka/server/AbstractFetcherManagerTest.scala  | 34 ++++++++++++
 5 files changed, 113 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bcb0e72..219f497 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -110,8 +110,7 @@ class LogCleaner(initialConfig: CleanerConfig,
                                         "bytes",
                                         time = time)
 
-  /* the threads */
-  private val cleaners = mutable.ArrayBuffer[CleanerThread]()
+  private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()
 
   /* a metric to track the maximum utilization of any thread's buffer in the last cleaning
*/
   newGauge("max-buffer-utilization-percent",
@@ -139,6 +138,13 @@ class LogCleaner(initialConfig: CleanerConfig,
           def value: Int = Math.max(0, (cleaners.map(_.lastPreCleanStats).map(_.maxCompactionDelayMs).max
/ 1000).toInt)
           })
 
+  newGauge("DeadThreadCount",
+    new Gauge[Int] {
+      def value: Int = deadThreadCount
+    })
+
+  private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
+
   /**
    * Start the background cleaning
    */
@@ -272,7 +278,7 @@ class LogCleaner(initialConfig: CleanerConfig,
    * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning
repeatedly by
    * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
    */
-  private class CleanerThread(threadId: Int)
+  private[log] class CleanerThread(threadId: Int)
     extends ShutdownableThread(name = s"kafka-log-cleaner-thread-$threadId", isInterruptible
= false) {
 
     protected override def loggerName = classOf[LogCleaner].getName
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index d76fbfc..49451de 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -76,6 +76,16 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val
name: Stri
     Map("clientId" -> clientId)
   )
 
+  newGauge("DeadThreadCount", {
+    new Gauge[Int] {
+      def value: Int = {
+        deadThreadCount
+      }
+    }
+  }, Map("clientId" -> clientId))
+
+  private[server] def deadThreadCount: Int = lock synchronized { fetcherThreadMap.values.count(_.isThreadFailed)
}
+
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
       fetcherThreadMap.foreach { case (id, thread) =>
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 02d09da..0ca21c4 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -34,9 +34,16 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
     awaitShutdown()
   }
 
-  def isShutdownComplete: Boolean = {
-    shutdownComplete.getCount == 0
-  }
+  def isShutdownInitiated: Boolean = shutdownInitiated.getCount == 0
+
+  def isShutdownComplete: Boolean = shutdownComplete.getCount == 0
+
+  /**
+    * @return true if there has been an unexpected error and the thread shut down
+    */
+  // mind that run() might set both when we're shutting down the broker
+  // but the return value of this function at that point wouldn't matter
+  def isThreadFailed: Boolean = isShutdownComplete && !isShutdownInitiated
 
   def initiateShutdown(): Boolean = {
     this.synchronized {
@@ -55,7 +62,7 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
    * After calling initiateShutdown(), use this API to wait until the shutdown is complete
    */
   def awaitShutdown(): Unit = {
-    if (shutdownInitiated.getCount != 0)
+    if (!isShutdownInitiated)
       throw new IllegalStateException("initiateShutdown() was not called before awaitShutdown()")
     else {
       if (isStarted)
@@ -102,7 +109,5 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
     info("Stopped")
   }
 
-  def isRunning: Boolean = {
-    shutdownInitiated.getCount() != 0
-  }
+  def isRunning: Boolean = !isShutdownInitiated
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index df6df0b..5697507 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -20,21 +20,22 @@ package kafka.log
 import java.io.PrintWriter
 
 import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
 import org.apache.kafka.common.record.{CompressionType, RecordBatch}
+import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.{Iterable, JavaConverters, Seq}
 import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.{Iterable, JavaConverters, Seq}
 
 /**
   * This is an integration test that tests the fully integrated log cleaner
   */
-class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
+class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with KafkaMetricsGroup
{
 
   val codec: CompressionType = CompressionType.LZ4
 
@@ -60,18 +61,6 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest
{
       writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
     }
 
-    def getGauge[T](metricName: String, metricScope: String): Gauge[T] = {
-      Metrics.defaultRegistry.allMetrics.asScala
-        .filterKeys(k => {
-          k.getName.endsWith(metricName) && k.getScope.endsWith(metricScope)
-        })
-        .headOption
-        .getOrElse { fail(s"Unable to find metric $metricName") }
-        .asInstanceOf[(Any, Gauge[Any])]
-        ._2
-        .asInstanceOf[Gauge[T]]
-    }
-
     breakPartitionLog(topicPartitions(0))
     breakPartitionLog(topicPartitions(1))
 
@@ -95,6 +84,24 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest
{
     assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
   }
 
+  private def getGauge[T](filter: MetricName => Boolean): Gauge[T] = {
+    Metrics.defaultRegistry.allMetrics.asScala
+      .filterKeys(filter(_))
+      .headOption
+      .getOrElse { fail(s"Unable to find metric") }
+      .asInstanceOf[(Any, Gauge[Any])]
+      ._2
+      .asInstanceOf[Gauge[T]]
+  }
+
+  private def getGauge[T](metricName: String): Gauge[T] = {
+    getGauge(_.getName.endsWith(metricName))
+  }
+
+  private def getGauge[T](metricName: String, metricScope: String): Gauge[T] = {
+    getGauge(k => k.getName.endsWith(metricName) && k.getScope.endsWith(metricScope))
+  }
+
   @Test
   def testMaxLogCompactionLag(): Unit = {
     val msPerHour = 60 * 60 * 1000
@@ -185,4 +192,29 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest
{
       (key, curValue)
     }
   }
+
+  @Test
+  def testIsThreadFailed(): Unit = {
+    val metricName = "DeadThreadCount"
+    removeMetric(metricName) // remove the existing metric so it will be attached to this
object below on creation
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 100000, backOffMs
= 100)
+    cleaner.startup()
+    assertEquals(0, cleaner.deadThreadCount)
+    // we simulate the unexpected error with an interrupt
+    cleaner.cleaners.foreach(_.interrupt())
+    // wait until interruption is propagated to all the threads
+    TestUtils.waitUntilTrue(
+      () => cleaner.cleaners.foldLeft(true)((result, thread) => {
+        thread.isThreadFailed && result
+      }), "Threads didn't terminate unexpectedly"
+    )
+    assertEquals(cleaner.cleaners.size, getGauge[Int](metricName).value())
+    assertEquals(cleaner.cleaners.size, cleaner.deadThreadCount)
+  }
+
+  private def removeMetric(name: String): Unit = {
+    val metricName = Metrics.defaultRegistry().allMetrics()
+      .asScala.find(p => p._1.getName.endsWith(name)).get._1
+    Metrics.defaultRegistry().removeMetric(metricName)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 15ce971..d197845 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -97,4 +97,38 @@ class AbstractFetcherManagerTest {
     fetcherManager.removeFetcherForPartitions(Set(tp))
     assertEquals(0, getMetricValue(metricName))
   }
+  @Test
+  def testDeadThreadCountMetric(): Unit = {
+    val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread])
+    val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 2) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
= {
+        fetcher
+      }
+    }
+
+    val fetchOffset = 10L
+    val leaderEpoch = 15
+    val tp = new TopicPartition("topic", 0)
+    val initialFetchState = InitialFetchState(
+      leader = new BrokerEndPoint(0, "localhost", 9092),
+      currentLeaderEpoch = leaderEpoch,
+      initOffset = fetchOffset)
+
+    EasyMock.expect(fetcher.start())
+    EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch))))
+    EasyMock.expect(fetcher.isThreadFailed).andReturn(true)
+    EasyMock.replay(fetcher)
+
+    fetcherManager.addFetcherForPartitions(Map(tp -> initialFetchState))
+
+    assertEquals(1, fetcherManager.deadThreadCount)
+    EasyMock.verify(fetcher)
+
+    EasyMock.reset(fetcher)
+    EasyMock.expect(fetcher.isThreadFailed).andReturn(false)
+    EasyMock.replay(fetcher)
+
+    assertEquals(0, fetcherManager.deadThreadCount)
+    EasyMock.verify(fetcher)
+  }
 }


Mime
View raw message