kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4982; Add listener tags to socket-server-metrics (KIP-136)
Date Sat, 13 May 2017 01:42:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 238e73978 -> f21f8f2d4


KAFKA-4982; Add listener tags to socket-server-metrics (KIP-136)

Author: Edoardo Comar <ecomar@uk.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3004 from edoardocomar/KAFKA-4982


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f21f8f2d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f21f8f2d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f21f8f2d

Branch: refs/heads/trunk
Commit: f21f8f2d44f97eba4ce155ac9fcc8432f00cad24
Parents: 238e739
Author: Edoardo Comar <ecomar@uk.ibm.com>
Authored: Sat May 13 01:54:46 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat May 13 02:42:37 2017 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala | 24 +++++++++++---------
 .../unit/kafka/network/SocketServerTest.scala   | 21 +++++++++++++++++
 2 files changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f21f8f2d/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b2a3456..fb647fa 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -21,7 +21,6 @@ import java.io.IOException
 import java.net._
 import java.nio.channels._
 import java.nio.channels.{Selector => NSelector}
-import java.util
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 
@@ -34,7 +33,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Mode,
Selectable, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable,
Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -68,12 +67,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time:
Time
   private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
   private var connectionQuotas: ConnectionQuotas = _
 
-  private val allMetricNames = (0 until totalProcessorThreads).map { i =>
-    val tags = new util.HashMap[String, String]()
-    tags.put("networkProcessor", i.toString)
-    metrics.metricName("io-wait-ratio", "socket-server-metrics", tags)
-  }
-
   /**
    * Start the socket server
    */
@@ -107,7 +100,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
 
     newGauge("NetworkProcessorAvgIdlePercent",
       new Gauge[Double] {
-        def value = allMetricNames.map { metricName =>
+        private val ioWaitRatioMetricNames = processors.map { p =>
+          metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
+        }
+
+        def value = ioWaitRatioMetricNames.map { metricName =>
           Option(metrics.metric(metricName)).fold(0.0)(_.value)
         }.sum / totalProcessorThreads
       }
@@ -400,7 +397,10 @@ private[kafka] class Processor(val id: Int,
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
-  private val metricTags = Map("networkProcessor" -> id.toString).asJava
+  private[kafka] val metricTags = mutable.LinkedHashMap(
+    "listener" -> listenerName.value,
+    "networkProcessor" -> id.toString
+  ).asJava
 
   newGauge("IdlePercent",
     new Gauge[Double] {
@@ -408,7 +408,9 @@ private[kafka] class Processor(val id: Int,
         Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics",
metricTags))).fold(0.0)(_.value)
       }
     },
-    metricTags.asScala
+    // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the
equivalent Selector metric
+    // also includes the listener name)
+    Map("networkProcessor" -> id.toString)
   )
 
   private val selector = new KSelector(

http://git-wip-us.apache.org/repos/asf/kafka/blob/f21f8f2d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9b278ae..7678550 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -419,4 +419,25 @@ class SocketServerTest extends JUnitSuite {
     assertEquals(Map.empty, nonZeroMetricNamesAndValues)
   }
 
+  @Test
+  def testProcessorMetricsTags(): Unit = {
+    val kafkaMetricNames = metrics.metrics.keySet.asScala.filter(_.tags.asScala.get("listener").nonEmpty)
+    assertFalse(kafkaMetricNames.isEmpty)
+
+    val expectedListeners = Set("PLAINTEXT", "TRACE")
+    kafkaMetricNames.foreach { kafkaMetricName =>
+      assertTrue(expectedListeners.contains(kafkaMetricName.tags.get("listener")))
+    }
+
+    // legacy metrics not tagged
+    val yammerMetricsNames = YammerMetrics.defaultRegistry.allMetrics.asScala
+      .filterKeys(_.getType.equals("Processor"))
+      .collect { case (k, _: Gauge[_]) => k }
+    assertFalse(yammerMetricsNames.isEmpty)
+
+    yammerMetricsNames.foreach { yammerMetricName =>
+      assertFalse(yammerMetricName.getMBeanName.contains("listener="))
+    }
+  }
+
 }


Mime
View raw message