Repository: kafka
Updated Branches:
refs/heads/trunk 0f6cc0a05 -> f689e9b1e
KAFKA-1902; fix MetricName so that Yammer reporter can work correctly; patched by Jun Rao;
reviewed by Manikumar Reddy, Manikumar Reddy and Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f689e9b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f689e9b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f689e9b1
Branch: refs/heads/trunk
Commit: f689e9b1e361cdf47e9966567631c06f614e446d
Parents: 0f6cc0a
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Jan 28 19:07:39 2015 -0600
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Jan 28 19:07:39 2015 -0600
----------------------------------------------------------------------
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 27 ++++++++++++++++++--
1 file changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f689e9b1/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index e9e4918..9e31184 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -61,9 +61,15 @@ trait KafkaMetricsGroup extends Logging {
nameBuilder.append(name)
}
- KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName))
+ val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null)
+ val tagsName = KafkaMetricsGroup.toMBeanName(tags)
+ tagsName match {
+ case Some(tn) =>
+ nameBuilder.append(",").append(tn)
+ case None =>
+ }
- new MetricName(group, typeName, name, null, nameBuilder.toString())
+ new MetricName(group, typeName, name, scope, nameBuilder.toString())
}
def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String]
= Map.empty) =
@@ -160,6 +166,23 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
}
}
+ private def toScope(tags: collection.Map[String, String]): Option[String] = {
+ val filteredTags = tags
+ .filter { case (tagKey, tagValue) => tagValue != ""}
+ if (filteredTags.nonEmpty) {
+ // convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
+ val tagsString = filteredTags
+ .toList.sortWith((t1, t2) => t1._1 < t2._1)
+ .map { case (key, value) => "%s.%s".format(key, value.replaceAll("\\.", "_"))}
+ .mkString(".")
+
+ Some(tagsString)
+ }
+ else {
+ None
+ }
+ }
+
def removeAllConsumerMetrics(clientId: String) {
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
|