kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [25/37] git commit: Add missing metrics in 0.8; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-604
Date Mon, 04 Mar 2013 04:22:01 GMT
Add missing metrics in 0.8; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-604


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9115977b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9115977b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9115977b

Branch: refs/heads/trunk
Commit: 9115977b38ad207028c7c194524a408b9e5789f0
Parents: 828ce83
Author: Jun Rao <junrao@gmail.com>
Authored: Fri Feb 22 18:47:37 2013 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Feb 22 18:47:37 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Partition.scala  |    2 +-
 .../scala/kafka/consumer/ConsumerTopicStats.scala  |    8 ++++----
 .../consumer/FetchRequestAndResponseStats.scala    |    8 ++++----
 core/src/main/scala/kafka/log/Log.scala            |    2 +-
 core/src/main/scala/kafka/producer/Producer.scala  |    6 +++---
 .../kafka/producer/ProducerRequestStats.scala      |    8 ++++----
 .../main/scala/kafka/producer/ProducerStats.scala  |    1 -
 .../scala/kafka/producer/ProducerTopicStats.scala  |   11 ++++++-----
 .../kafka/producer/async/DefaultEventHandler.scala |    2 +-
 .../kafka/producer/async/ProducerSendThread.scala  |    2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |    8 ++++----
 .../scala/kafka/server/KafkaRequestHandler.scala   |    4 ++--
 core/src/main/scala/kafka/server/KafkaServer.scala |    2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |    6 ++++++
 14 files changed, 38 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index b55b464..469ac79 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -56,7 +56,7 @@ class Partition(val topic: String,
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
   newGauge(
-    topic + "-" + partitionId + "UnderReplicated",
+    topic + "-" + partitionId + "-UnderReplicated",
     new Gauge[Int] {
       def getValue = {
         if (isUnderReplicated) 1 else 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index bb38f35..ff5f470 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -24,8 +24,8 @@ import kafka.common.ClientIdAndTopic
 
 @threadsafe
 class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+  val messageRate = newMeter(metricId + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
 /**
@@ -35,12 +35,12 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
 class ConsumerTopicStats(clientId: String) extends Logging {
   private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
   private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics"))
// to differentiate from a topic named AllTopics
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
// to differentiate from a topic named AllTopics
 
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
   def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 2cc0f36..875eeeb 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -23,8 +23,8 @@ import java.util.concurrent.TimeUnit
 import kafka.common.ClientIdAndBroker
 
 class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup
{
-  val requestTimer = new KafkaTimer(newTimer(metricId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "-FetchResponseSize")
+  val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(metricId + "FetchResponseSize")
 }
 
 /**
@@ -34,12 +34,12 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends
KafkaM
 class FetchRequestAndResponseStats(clientId: String) {
   private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
   private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
-  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId,
"All.Brokers"))
+  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId,
"AllBrokers"))
 
   def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
 
   def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics
= {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8d9a883..9a5f053 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -266,7 +266,7 @@ private[kafka] class Log(val dir: File,
       (-1L, -1L)
     } else {
       BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
-      BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count)
+      BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count)
 
       // trim any invalid bytes or partial messages before appending it to the on-disk log
       var validMessages = trimInvalidBytes(messages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 66638f2..764bb02 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -50,7 +50,6 @@ class Producer[K,V](config: ProducerConfig,
       producerSendThread.start()
   }
 
-  private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
   private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
 
   KafkaMetricsReporter.startReporters(config.props)
@@ -81,7 +80,7 @@ class Producer[K,V](config: ProducerConfig,
   private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
     for (message <- messages) {
       producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
-      producerTopicStats.getProducerAllTopicStats.messageRate.mark()
+      producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
     }
   }
 
@@ -106,7 +105,8 @@ class Producer[K,V](config: ProducerConfig,
           }
       }
       if(!added) {
-        producerStats.droppedMessageRate.mark()
+        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
+        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
         error("Event queue is full of unsent messages, could not send event: " + message.toString)
         throw new QueueFullException("Event queue is full of unsent messages, could not send
event: " + message.toString)
       }else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index e29ccad..9694220 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -22,8 +22,8 @@ import kafka.utils.Pool
 import kafka.common.ClientIdAndBroker
 
 class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "-ProducerRequestRateAndTimeMs",
TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "-ProducerRequestSize")
+  val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(metricId + "ProducerRequestSize")
 }
 
 /**
@@ -33,12 +33,12 @@ class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGr
 class ProducerRequestStats(clientId: String) {
   private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
   private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
-  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId,
"All.Brokers"))
+  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId,
"AllBrokers"))
 
   def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
 
   def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 62ff544..e1610d3 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -24,7 +24,6 @@ class ProducerStats(clientId: String) extends KafkaMetricsGroup {
   val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors",
TimeUnit.SECONDS)
   val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
   val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index fd0b44e..ed209f4 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -24,8 +24,9 @@ import java.util.concurrent.TimeUnit
 
 @threadsafe
 class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+  val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
 }
 
 /**
@@ -35,12 +36,12 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
 class ProducerTopicStats(clientId: String) {
   private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
   private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics"))
// to differentiate from a topic named AllTopics
+  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
// to differentiate from a topic named AllTopics
 
-  def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
+  def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
 
   def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 5569cc2..0fd733e 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -56,7 +56,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         keyed =>
           val dataSize = keyed.message.payloadSize
           producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
-          producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
+          producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
       }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.messageSendMaxRetries + 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 2b39cab..6691147 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -34,7 +34,7 @@ class ProducerSendThread[K,V](val threadName: String,
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge(clientId + "-ProducerQueueSize-" + getId,
+  newGauge(clientId + "-ProducerQueueSize",
           new Gauge[Int] {
             def getValue = queue.size
           })

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6df077b..3c84695 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -180,7 +180,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Append [%s] to local log ".format(partitionAndData.toString))
     partitionAndData.map {case (topicAndPartition, messages) =>
       BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
-      BrokerTopicStats.getBrokerAllTopicStats.bytesInRate.mark(messages.sizeInBytes)
+      BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
 
       try {
         val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
topicAndPartition.partition)
@@ -198,7 +198,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           null
         case e =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
-          BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark()
+          BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
           error("Error processing ProducerRequest with correlation id %d from client %s on
%s:%d"
             .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition.topic,
topicAndPartition.partition), e)
           new ProduceResult(topicAndPartition, e)
@@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           try {
             val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize,
fetchRequest.replicaId)
             BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
-            BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
+            BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
             if (!isFetchFromFollower) {
               new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             } else {
@@ -275,7 +275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           } catch {
             case t: Throwable =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
-              BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
+              BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize),
t)
               new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 69ca058..842dcf3 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -82,9 +82,9 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
 object BrokerTopicStats extends Logging {
   private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new BrokerTopicMetrics("AllTopics")
+  private val allTopicsStats = new BrokerTopicMetrics("AllTopics")
 
-  def getBrokerAllTopicStats(): BrokerTopicMetrics = allTopicStats
+  def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
 
   def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
     stats.getAndMaybePut(topic + "-")

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 09e261f..e7248c3 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -97,7 +97,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
    *  Forces some dynamic jmx beans to be registered on server startup.
    */
   private def registerStats() {
-    BrokerTopicStats.getBrokerAllTopicStats()
+    BrokerTopicStats.getBrokerAllTopicsStats()
     ControllerStats.offlinePartitionRate
     ControllerStats.uncleanLeaderElectionRate
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4e6c8ea..f7fe0de 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -58,6 +58,12 @@ class ReplicaManager(val config: KafkaConfig,
     }
   )
   newGauge(
+    "PartitionCount",
+    new Gauge[Int] {
+      def getValue = allPartitions.size
+    }
+  )
+  newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
       def getValue = {


Mime
View raw message