kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5461; Add metric to track global topic count and global parition count in a cluster
Date Fri, 04 Aug 2017 10:29:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5a516fb28 -> e7f7d4093


KAFKA-5461; Add metric to track global topic count and global parition count in a cluster

Author: Abhishek Mendhekar <amendhekar@linkedin.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3549 from abhishekmendhekar/KAFKA-5461


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e7f7d409
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e7f7d409
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e7f7d409

Branch: refs/heads/trunk
Commit: e7f7d4093968d8de494e371a6d3c85e555332cbb
Parents: 5a516fb
Author: Abhishek Mendhekar <amendhekar@linkedin.com>
Authored: Fri Aug 4 11:23:43 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Aug 4 11:28:46 2017 +0100

----------------------------------------------------------------------
 .gitignore                                      |  4 +--
 .../kafka/controller/KafkaController.scala      | 32 +++++++++++++++++---
 .../scala/unit/kafka/metrics/MetricsTest.scala  | 11 +++++++
 3 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e7f7d409/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index e12082e..6088349 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,8 +23,8 @@ TAGS
 *.iml
 .project
 .settings
-kafka.ipr
-kafka.iws
+*.ipr
+*.iws
 .vagrant
 Vagrantfile.local
 /logs

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7f7d409/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0a6aac0..1de04d8 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -188,6 +188,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
   @volatile private var activeControllerId = -1
   @volatile private var offlinePartitionCount = 0
   @volatile private var preferredReplicaImbalanceCount = 0
+  @volatile private var globalTopicCount = 0
+  @volatile private var globalPartitionCount = 0
 
   newGauge(
     "ActiveControllerCount",
@@ -217,6 +219,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
     }
   )
 
+  newGauge(
+    "GlobalTopicCount",
+    new Gauge[Int] {
+      def value: Int = globalTopicCount
+    }
+  )
+
+  newGauge(
+    "GlobalPartitionCount",
+    new Gauge[Int] {
+      def value: Int = globalPartitionCount
+    }
+  )
+
   def epoch: Int = controllerContext.epoch
 
   def state: ControllerState = eventManager.state
@@ -319,6 +335,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
     kafkaScheduler.shutdown()
     offlinePartitionCount = 0
     preferredReplicaImbalanceCount = 0
+    globalTopicCount = 0
+    globalPartitionCount = 0
 
     // de-register partition ISR listener for on-going partition reassignment task
     deregisterPartitionReassignmentIsrChangeListeners()
@@ -1593,8 +1611,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
 
   private def updateMetrics(): Unit = {
     offlinePartitionCount =
-      if (!isActive) 0
-      else {
+      if (!isActive) {
+        0
+      } else {
         controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) =>
           !controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader)
&&
             !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
@@ -1602,8 +1621,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
       }
 
     preferredReplicaImbalanceCount =
-      if (!isActive) 0
-      else {
+      if (!isActive) {
+        0
+      } else {
         controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas)
=>
           val preferredReplica = replicas.head
           val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
@@ -1611,6 +1631,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
             !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
         }
       }
+
+    globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
+
+    globalPartitionCount = if (!isActive) 0 else controllerContext.partitionLeadershipInfo.size
   }
 
   // visible for testing

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7f7d409/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index e32f429..bff2136 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -156,6 +156,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assertTrue(meterCount(bytesOut) > initialBytesOut)
   }
 
+  @Test
+  def testControllerMetrics(): Unit = {
+    val metrics = Metrics.defaultRegistry.allMetrics
+    
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ActiveControllerCount"),
1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=OfflinePartitionsCount"),
1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"),
1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalTopicCount"),
1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalPartitionCount"),
1)
+  }
+
   private def meterCount(metricName: String): Long = {
     Metrics.defaultRegistry.allMetrics.asScala
       .filterKeys(_.getMBeanName.endsWith(metricName))


Mime
View raw message