kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2332; Add quota metrics to old producer and consumer
Date Tue, 01 Sep 2015 21:46:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 13c432f79 -> e582447ad


KAFKA-2332; Add quota metrics to old producer and consumer

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Aditya Auradkar <aauradkar@linkedin.com>, Joel Koshy <jjkoshy.w@gmail.com>,
Jiangjie Qin <becket.qin@gmail.com>

Closes #176 from lindong28/KAFKA-2332


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e582447a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e582447a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e582447a

Branch: refs/heads/trunk
Commit: e582447adb4708731aff74aa294e7ce2b30b0a41
Parents: 13c432f
Author: Dong Lin <lindong28@gmail.com>
Authored: Tue Sep 1 14:46:16 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue Sep 1 14:46:16 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/FetchRequestAndResponseStats.scala | 1 +
 core/src/main/scala/kafka/consumer/SimpleConsumer.scala     | 3 +++
 core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala   | 4 +++-
 .../main/scala/kafka/producer/ProducerRequestStats.scala    | 1 +
 core/src/main/scala/kafka/producer/SyncProducer.scala       | 9 +++++++--
 core/src/main/scala/kafka/server/KafkaServer.scala          | 2 +-
 6 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e582447a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 3df55e1..05ea9ac 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -34,6 +34,7 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetr
 
   val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS, tags))
   val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags)
+  val throttleTimeStats = newTimer("FetchRequestThrottleRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS, tags)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e582447a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 4e1833a..b7af6d6 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -19,6 +19,7 @@ package kafka.consumer
 
 
 import java.nio.channels.{AsynchronousCloseException, ClosedByInterruptException}
+import java.util.concurrent.TimeUnit
 
 import kafka.api._
 import kafka.network._
@@ -135,6 +136,8 @@ class SimpleConsumer(val host: String,
     val fetchedSize = fetchResponse.sizeInBytes
     fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
     fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).throttleTimeStats.update(fetchResponse.throttleTimeMs,
TimeUnit.MILLISECONDS)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.throttleTimeStats.update(fetchResponse.throttleTimeMs,
TimeUnit.MILLISECONDS)
     fetchResponse
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e582447a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 9e31184..72ecae1 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -123,6 +123,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
     // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
     new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
     new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestThrottleRateAndTimeMs"),
 
     /**
      * ProducerRequestStats <-- SyncProducer
@@ -148,7 +149,8 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
 
     // kafka.producer.ProducerRequestStats <-- SyncProducer
     new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestThrottleRateAndTimeMs")
   )
 
   private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e582447a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 026e93a..b453f63 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -29,6 +29,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup
 
   val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS, tags))
   val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
+  val throttleTimeStats = newTimer("ProducerRequestThrottleRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS, tags);
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e582447a/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index dcee501..73db2b1 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -18,6 +18,7 @@
 package kafka.producer
 
 import java.util.Random
+import java.util.concurrent.TimeUnit
 
 import kafka.api._
 import kafka.network.{RequestOrResponseSend, BlockingChannel}
@@ -104,8 +105,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
         response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else
true)
       }
     }
-    if(producerRequest.requiredAcks != 0)
-      ProducerResponse.readFrom(response.payload)
+    if(producerRequest.requiredAcks != 0) {
+      val producerResponse = ProducerResponse.readFrom(response.payload)
+      producerRequestStats.getProducerRequestStats(config.host, config.port).throttleTimeStats.update(producerResponse.throttleTime,
TimeUnit.MILLISECONDS)
+      producerRequestStats.getProducerRequestAllBrokersStats.throttleTimeStats.update(producerResponse.throttleTime,
TimeUnit.MILLISECONDS)
+      producerResponse
+    }
     else
       null
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e582447a/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 17db4fa..039c7eb 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -48,7 +48,7 @@ import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator}
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
   // I'm listing out individual properties here since the names are slightly different in
each Config class...
-  def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String, Object] = {
+  private[kafka] def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String,
Object] = {
     val logProps = new util.HashMap[String, Object]()
     logProps.put(LogConfig.SegmentBytesProp, kafkaConfig.logSegmentBytes)
     logProps.put(LogConfig.SegmentMsProp, kafkaConfig.logRollTimeMillis)


Mime
View raw message