kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6263; Expose metrics for group and transaction metadata loading duration
Date Sun, 04 Aug 2019 04:01:15 GMT
This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b1dc1c  KAFKA-6263; Expose metrics for group and transaction metadata loading duration
0b1dc1c is described below

commit 0b1dc1ca7be3566a4367d7d3aae2c691cf94f48e
Author: anatasiavela <anastasiavela@berkeley.edu>
AuthorDate: Sat Aug 3 21:00:46 2019 -0700

    KAFKA-6263; Expose metrics for group and transaction metadata loading duration
    
    [JIRA](https://issues.apache.org/jira/browse/KAFKA-6263)
    
    - Add metrics to provide visibility for how long group metadata and transaction metadata
take to load in order to understand some inactivity seen in the consumer groups
    - Tests include mocking load times by creating a delay after each are loaded and ensuring
the measured JMX metric is as it should be
    
    Author: anatasiavela <anastasiavela@berkeley.edu>
    
    Reviewers: Gwen Shapira, Jason Gustafson
    
    Closes #7045 from anatasiavela/KAFKA-6263
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 16 +++++---
 .../coordinator/group/GroupMetadataManager.scala   | 20 ++++++++-
 .../transaction/TransactionCoordinator.scala       |  2 +-
 .../transaction/TransactionStateManager.scala      | 21 ++++++++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 .../group/GroupCoordinatorConcurrencyTest.scala    |  3 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |  3 +-
 .../group/GroupMetadataManagerTest.scala           | 47 +++++++++++++++++++++-
 .../TransactionCoordinatorConcurrencyTest.scala    |  3 +-
 .../transaction/TransactionStateManagerTest.scala  | 39 +++++++++++++++++-
 docs/ops.html                                      | 20 +++++++++
 11 files changed, 157 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 6a57d59..7874946 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -28,6 +28,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
@@ -55,7 +56,8 @@ class GroupCoordinator(val brokerId: Int,
                        val groupManager: GroupMetadataManager,
                        val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
                        val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
-                       time: Time) extends Logging {
+                       time: Time,
+                       metrics: Metrics) extends Logging {
   import GroupCoordinator._
 
   type JoinCallback = JoinGroupResult => Unit
@@ -1084,10 +1086,11 @@ object GroupCoordinator {
   def apply(config: KafkaConfig,
             zkClient: KafkaZkClient,
             replicaManager: ReplicaManager,
-            time: Time): GroupCoordinator = {
+            time: Time,
+            metrics: Metrics): GroupCoordinator = {
     val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
     val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
-    apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time)
+    apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time, metrics)
   }
 
   private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
@@ -1108,7 +1111,8 @@ object GroupCoordinator {
             replicaManager: ReplicaManager,
             heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
             joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
-            time: Time): GroupCoordinator = {
+            time: Time,
+            metrics: Metrics): GroupCoordinator = {
     val offsetConfig = this.offsetConfig(config)
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
@@ -1116,8 +1120,8 @@ object GroupCoordinator {
       groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
 
     val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
-      offsetConfig, replicaManager, zkClient, time)
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager,
heartbeatPurgatory, joinPurgatory, time)
+      offsetConfig, replicaManager, zkClient, time, metrics)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager,
heartbeatPurgatory, joinPurgatory, time, metrics)
   }
 
   def joinError(memberId: String, error: Errors): JoinGroupResult = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 03c3e37..7d8499f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -36,6 +36,8 @@ import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.stats.{Avg, Max}
+import org.apache.kafka.common.metrics.{MetricConfig, Metrics}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types._
@@ -53,7 +55,8 @@ class GroupMetadataManager(brokerId: Int,
                            config: OffsetConfig,
                            replicaManager: ReplicaManager,
                            zkClient: KafkaZkClient,
-                           time: Time) extends Logging with KafkaMetricsGroup {
+                           time: Time,
+                           metrics: Metrics) extends Logging with KafkaMetricsGroup {
 
   private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
 
@@ -82,6 +85,16 @@ class GroupMetadataManager(brokerId: Int,
    * We use this structure to quickly find the groups which need to be updated by the commit/abort
marker. */
   private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]()
 
+  /* setup metrics*/
+  val partitionLoadSensor = metrics.sensor("PartitionLoadTime")
+
+  partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
+    "group-coordinator-metrics",
+    "The max time it took to load the partitions in the last 30sec"), new Max())
+  partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
+    "group-coordinator-metrics",
+    "The avg time it took to load the partitions in the last 30sec"), new Avg())
+
   this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "
 
   private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = {
@@ -498,7 +511,10 @@ class GroupMetadataManager(brokerId: Int,
     try {
       val startMs = time.milliseconds()
       doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
-      info(s"Finished loading offsets and group metadata from $topicPartition in ${time.milliseconds()
- startMs} milliseconds.")
+      val endMs = time.milliseconds()
+      val timeLapse = endMs - startMs
+      partitionLoadSensor.record(timeLapse, endMs, false)
+      info(s"Finished loading offsets and group metadata from $topicPartition in $timeLapse
milliseconds.")
     } catch {
       case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
     } finally {
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 9d4eed6..6d99889 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -55,7 +55,7 @@ object TransactionCoordinator {
     // we do not need to turn on reaper thread since no tasks will be expired and there are
no completed tasks to be purged
     val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory",
config.brokerId,
       reaperEnabled = false, timerEnabled = false)
-    val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler,
replicaManager, txnConfig, time)
+    val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler,
replicaManager, txnConfig, time, metrics)
 
     val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
     val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache,
txnStateManager,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 45ee4e9..38caed5 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -30,6 +30,8 @@ import kafka.utils.{Logging, Pool, Scheduler}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.stats.{Avg, Max}
+import org.apache.kafka.common.metrics.{MetricConfig, Metrics}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -70,7 +72,8 @@ class TransactionStateManager(brokerId: Int,
                               scheduler: Scheduler,
                               replicaManager: ReplicaManager,
                               config: TransactionConfig,
-                              time: Time) extends Logging {
+                              time: Time,
+                              metrics: Metrics) extends Logging {
 
   this.logIdent = "[Transaction State Manager " + brokerId + "]: "
 
@@ -94,6 +97,16 @@ class TransactionStateManager(brokerId: Int,
   /** number of partitions for the transaction log topic */
   private val transactionTopicPartitionCount = getTransactionTopicPartitionCount
 
+  /** setup metrics*/
+  private val partitionLoadSensor = metrics.sensor("PartitionLoadTime")
+
+  partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
+    "transaction-coordinator-metrics",
+    "The max time it took to load the partitions in the last 30sec"), new Max())
+  partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
+    "transaction-coordinator-metrics",
+    "The avg time it took to load the partitions in the last 30sec"), new Avg())
+
   // visible for testing only
   private[transaction] def addLoadingPartition(partitionId: Int, coordinatorEpoch: Int):
Unit = {
     val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
@@ -339,8 +352,10 @@ class TransactionStateManager(brokerId: Int,
                 currOffset = batch.nextOffset
               }
             }
-
-            info(s"Finished loading ${loadedTransactions.size} transaction metadata from
$topicPartition in ${time.milliseconds() - startMs} milliseconds")
+            val endMs = time.milliseconds()
+            val timeLapse = endMs - startMs
+            partitionLoadSensor.record(timeLapse, endMs, false)
+            info(s"Finished loading ${loadedTransactions.size} transaction metadata from
$topicPartition in $timeLapse milliseconds")
           }
         } catch {
           case t: Throwable => error(s"Error loading transactions from transaction log
$topicPartition", t)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 07ffe9d..6c433b7 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -276,7 +276,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         /* start group coordinator */
         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be
good to fix the underlying issue
-        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
+        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM,
metrics)
         groupCoordinator.startup()
 
         /* start transaction coordinator, with a separate background thread scheduler for
transaction expiration and log loading */
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index b85035f..ec31fc9 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -26,6 +26,7 @@ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.JoinGroupRequest
@@ -84,7 +85,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
     val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat",
timer, config.brokerId, reaperEnabled = false)
     val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId,
reaperEnabled = false)
 
-    groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory,
joinPurgatory, timer.time)
+    groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory,
joinPurgatory, timer.time, new Metrics())
     groupCoordinator.startup(false)
   }
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index cdf1518..5a35e33 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.cluster.Partition
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
@@ -111,7 +112,7 @@ class GroupCoordinatorTest {
     val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat",
timer, config.brokerId, reaperEnabled = false)
     val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId,
reaperEnabled = false)
 
-    groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory,
joinPurgatory, timer.time)
+    groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory,
joinPurgatory, timer.time, new Metrics())
     groupCoordinator.startup(enableMetadataExpiration = false)
 
     // add the partition into the owned partition list
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index faca447..dbcf5ed 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -19,11 +19,12 @@ package kafka.coordinator.group
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Gauge
+import java.lang.management.ManagementFactory
 import java.nio.ByteBuffer
 import java.util.Collections
 import java.util.Optional
 import java.util.concurrent.locks.ReentrantLock
-
+import javax.management.ObjectName
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
@@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics => kMetrics}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -56,6 +58,7 @@ class GroupMetadataManagerTest {
   var zkClient: KafkaZkClient = null
   var partition: Partition = null
   var defaultOffsetRetentionMs = Long.MaxValue
+  var metrics: kMetrics = null
 
   val groupId = "foo"
   val groupInstanceId = Some("bar")
@@ -87,9 +90,10 @@ class GroupMetadataManagerTest {
     EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
     EasyMock.replay(zkClient)
 
+    metrics = new kMetrics()
     time = new MockTime
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
-    groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig,
replicaManager, zkClient, time)
+    groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig,
replicaManager, zkClient, time, metrics)
     partition = EasyMock.niceMock(classOf[Partition])
   }
 
@@ -2051,4 +2055,43 @@ class GroupMetadataManagerTest {
     group.transitionTo(CompletingRebalance)
     expectMetrics(groupMetadataManager, 1, 0, 1)
   }
+
+  @Test
+  def testPartitionLoadMetric(): Unit = {
+    val server = ManagementFactory.getPlatformMBeanServer
+    val mBeanName = "kafka.server:type=group-coordinator-metrics"
+    val reporter = new JmxReporter("kafka.server")
+    metrics.addReporter(reporter)
+
+    def partitionLoadTime(attribute: String): Double = {
+      server.getAttribute(new ObjectName(mBeanName), attribute).asInstanceOf[Double]
+    }
+
+    assertTrue(server.isRegistered(new ObjectName(mBeanName)))
+    assertEquals(Double.NaN, partitionLoadTime( "partition-load-time-max"), 0)
+    assertEquals(Double.NaN, partitionLoadTime("partition-load-time-avg"), 0)
+    assertTrue(reporter.containsMbean(mBeanName))
+
+    val groupMetadataTopicPartition = groupTopicPartition
+    val startOffset = 15L
+    val memberId = "98098230493"
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
+      protocolType = "consumer", protocol = "range", memberId)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+    EasyMock.replay(replicaManager)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    assertTrue(partitionLoadTime("partition-load-time-max") >= 0.0)
+    assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0.0)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index bc6ed93..238ef4b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -28,6 +28,7 @@ import kafka.utils.{Pool, TestUtils}
 import org.apache.kafka.clients.{ClientResponse, NetworkClient}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests._
@@ -68,7 +69,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
       .anyTimes()
     EasyMock.replay(zkClient)
 
-    txnStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager,
txnConfig, time)
+    txnStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager,
txnConfig, time, new Metrics())
     for (i <- 0 until numPartitions)
       txnStateManager.addLoadedTransactionsToCache(i, coordinatorEpoch, new Pool[String,
TransactionMetadata]())
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 14745e7..4e778dd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -16,8 +16,10 @@
  */
 package kafka.coordinator.transaction
 
+import java.lang.management.ManagementFactory
 import java.nio.ByteBuffer
 import java.util.concurrent.locks.ReentrantLock
+import javax.management.ObjectName
 
 import kafka.log.Log
 import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager}
@@ -26,6 +28,7 @@ import org.scalatest.Assertions.fail
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -59,9 +62,10 @@ class TransactionStateManagerTest {
     .anyTimes()
 
   EasyMock.replay(zkClient)
+  val metrics = new Metrics()
 
   val txnConfig = TransactionConfig()
-  val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient,
scheduler, replicaManager, txnConfig, time)
+  val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient,
scheduler, replicaManager, txnConfig, time, metrics)
 
   val transactionalId1: String = "one"
   val transactionalId2: String = "two"
@@ -627,4 +631,37 @@ class TransactionStateManagerTest {
 
     EasyMock.replay(replicaManager)
   }
+
+  @Test
+  def testPartitionLoadMetric(): Unit = {
+    val server = ManagementFactory.getPlatformMBeanServer
+    val mBeanName = "kafka.server:type=transaction-coordinator-metrics"
+    val reporter = new JmxReporter("kafka.server")
+    metrics.addReporter(reporter)
+
+    def partitionLoadTime(attribute: String): Double = {
+      server.getAttribute(new ObjectName(mBeanName), attribute).asInstanceOf[Double]
+    }
+
+    assertTrue(server.isRegistered(new ObjectName(mBeanName)))
+    assertEquals(Double.NaN, partitionLoadTime( "partition-load-time-max"), 0)
+    assertEquals(Double.NaN, partitionLoadTime("partition-load-time-avg"), 0)
+    assertTrue(reporter.containsMbean(mBeanName))
+
+    txnMetadata1.state = Ongoing
+    txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 1),
+      new TopicPartition("topic1", 1)))
+
+    txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
+
+    val startOffset = 15L
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray:
_*)
+
+    prepareTxnLog(topicPartition, startOffset, records)
+    transactionManager.loadTransactionsForTxnTopicPartition(partitionId, 0, (_, _, _, _,
_) => ())
+    scheduler.tick()
+
+    assertTrue(partitionLoadTime("partition-load-time-max") >= 0)
+    assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0)
+  }
 }
diff --git a/docs/ops.html b/docs/ops.html
index f94afc3..984b80d 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1029,6 +1029,26 @@
         <td>Connection status of broker's ZooKeeper session which may be one of
             Disconnected|SyncConnected|AuthFailed|ConnectedReadOnly|SaslAuthenticated|Expired.</td>
       </tr>
+      <tr>
+        <td>Max time to load group metadata</td>
+        <td>kafka.server:type=group-coordinator-metrics,name=partition-load-time-max</td>
+        <td>maximum time, in milliseconds, it took to load offsets and group metadata
from the consumer offset partitions loaded in the last 30 seconds</td>
+      </tr>
+      <tr>
+        <td>Avg time to load group metadata</td>
+        <td>kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg</td>
+        <td>average time, in milliseconds, it took to load offsets and group metadata
from the consumer offset partitions loaded in the last 30 seconds</td>
+      </tr>
+      <tr>
+        <td>Max time to load transaction metadata</td>
+        <td>kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max</td>
+        <td>maximum time, in milliseconds, it took to load transaction metadata from
the consumer offset partitions loaded in the last 30 seconds</td>
+      </tr>
+      <tr>
+        <td>Avg time to load transaction metadata</td>
+        <td>kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg</td>
+        <td>average time, in milliseconds, it took to load transaction metadata from
the consumer offset partitions loaded in the last 30 seconds</td>
+      </tr>
   </tbody></table>
 
   <h4><a id="selector_monitoring" href="#selector_monitoring">Common monitoring
metrics for producer/consumer/connect/streams</a></h4>


Mime
View raw message