kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9788; Use distinct names for transaction and group load time sensors (#8784)
Date Thu, 04 Jun 2020 01:01:17 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 66709c0  KAFKA-9788; Use distinct names for transaction and group load time sensors
(#8784)
66709c0 is described below

commit 66709c02229714c46f8735e53d7b12156d6a4e3b
Author: Bob Barrett <bob.barrett@confluent.io>
AuthorDate: Wed Jun 3 17:53:30 2020 -0700

    KAFKA-9788; Use distinct names for transaction and group load time sensors (#8784)
    
    Sensor objects are stored in the Kafka metrics registry and keyed by name. If a new sensor
is created with the same name as an existing one, the existing one is returned rather than
a new object being created. The partition load time sensors for the transaction and group
coordinators used the same name, so data recorded to either was stored in the same object.
This meant that the metrics values for both metrics were identical and consisted of the combined
data. This patch changes the  [...]
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../scala/kafka/coordinator/group/GroupMetadataManager.scala  | 11 +++++++----
 .../coordinator/transaction/TransactionStateManager.scala     |  9 ++++++---
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index aba6a5f..4898e22 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -49,9 +49,9 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 
-import scala.jdk.CollectionConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 class GroupMetadataManager(brokerId: Int,
                            interBrokerProtocolVersion: ApiVersion,
@@ -89,13 +89,13 @@ class GroupMetadataManager(brokerId: Int,
   private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]()
 
   /* setup metrics*/
-  val partitionLoadSensor = metrics.sensor("PartitionLoadTime")
+  private val partitionLoadSensor = metrics.sensor(GroupMetadataManager.LoadTimeSensor)
 
   partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
-    "group-coordinator-metrics",
+    GroupMetadataManager.MetricsGroup,
     "The max time it took to load the partitions in the last 30sec"), new Max())
   partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
-    "group-coordinator-metrics",
+    GroupMetadataManager.MetricsGroup,
     "The avg time it took to load the partitions in the last 30sec"), new Avg())
 
   val offsetCommitsSensor = metrics.sensor("OffsetCommits")
@@ -991,6 +991,9 @@ class GroupMetadataManager(brokerId: Int,
  *    -> value version 0:       [protocol_type, generation, protocol, leader, members]
  */
 object GroupMetadataManager {
+  // Metrics names
+  val MetricsGroup: String = "group-coordinator-metrics"
+  val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
   private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
   private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index a96857c..8eb65df 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -48,6 +48,9 @@ object TransactionStateManager {
   val DefaultTransactionalIdExpirationMs: Int = TimeUnit.DAYS.toMillis(7).toInt
   val DefaultAbortTimedOutTransactionsIntervalMs: Int = TimeUnit.SECONDS.toMillis(10).toInt
   val DefaultRemoveExpiredTransactionalIdsIntervalMs: Int = TimeUnit.HOURS.toMillis(1).toInt
+
+  val MetricsGroup: String = "transaction-coordinator-metrics"
+  val LoadTimeSensor: String = "TransactionsPartitionLoadTime"
 }
 
 /**
@@ -95,13 +98,13 @@ class TransactionStateManager(brokerId: Int,
   private val transactionTopicPartitionCount = getTransactionTopicPartitionCount
 
   /** setup metrics*/
-  private val partitionLoadSensor = metrics.sensor("PartitionLoadTime")
+  private val partitionLoadSensor = metrics.sensor(TransactionStateManager.LoadTimeSensor)
 
   partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
-    "transaction-coordinator-metrics",
+    TransactionStateManager.MetricsGroup,
     "The max time it took to load the partitions in the last 30sec"), new Max())
   partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
-    "transaction-coordinator-metrics",
+    TransactionStateManager.MetricsGroup,
     "The avg time it took to load the partitions in the last 30sec"), new Avg())
 
   // visible for testing only


Mime
View raw message