kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1060 Break-down sendTime into responseQueueTime and the real sendTime; reviewed by Neha Narkhede and Jun Rao
Date Wed, 30 Oct 2013 23:22:22 GMT
Updated Branches:
  refs/heads/trunk bd49e4f3e -> bf4dbd5ee


KAFKA-1060 Break-down sendTime into responseQueueTime and the real sendTime; reviewed by Neha
Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf4dbd5e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf4dbd5e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf4dbd5e

Branch: refs/heads/trunk
Commit: bf4dbd5ee3be67a214c01aae0bf01b1492994cf5
Parents: bd49e4f
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Wed Oct 30 16:21:49 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Oct 30 16:22:00 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/network/RequestChannel.scala    | 29 +++++++++++++-------
 .../kafka/server/KafkaRequestHandler.scala      |  2 +-
 2 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bf4dbd5e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 77d7ec0..330d3a0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -42,9 +42,10 @@ object RequestChannel extends Logging {
   }
 
   case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs:
Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
-    @volatile var dequeueTimeMs = -1L
+    @volatile var requestDequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L
+    @volatile var responseDequeueTimeMs = -1L
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer = null
@@ -57,10 +58,11 @@ object RequestChannel extends Logging {
       // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
       if (apiLocalCompleteTimeMs < 0)
         apiLocalCompleteTimeMs = responseCompleteTimeMs
-      val queueTime = (dequeueTimeMs - startTimeMs).max(0L)
-      val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L)
+      val requestQueueTime = (requestDequeueTimeMs - startTimeMs).max(0L)
+      val apiLocalTime = (apiLocalCompleteTimeMs - requestDequeueTimeMs).max(0L)
       val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
-      val responseSendTime = (endTimeMs - responseCompleteTimeMs).max(0L)
+      val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L)
+      val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L)
       val totalTime = endTimeMs - startTimeMs
       var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId)))
       if (requestId == RequestKeys.FetchKey) {
@@ -72,15 +74,16 @@ object RequestChannel extends Logging {
       }
       metricsList.foreach{
         m => m.requestRate.mark()
-             m.queueTimeHist.update(queueTime)
+             m.requestQueueTimeHist.update(requestQueueTime)
              m.localTimeHist.update(apiLocalTime)
              m.remoteTimeHist.update(apiRemoteTime)
+             m.responseQueueTimeHist.update(responseQueueTime)
              m.responseSendTimeHist.update(responseSendTime)
              m.totalTimeHist.update(totalTime)
       }
       if(requestLogger.isTraceEnabled)
-        requestLogger.trace("Completed request:%s from client %s;totalTime:%d,queueTime:%d,localTime:%d,remoteTime:%d,sendTime:%d"
-          .format(requestObj, remoteAddress, totalTime, queueTime, apiLocalTime, apiRemoteTime,
responseSendTime))
+        requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
+          .format(requestObj, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime,
responseQueueTime, responseSendTime))
     }
   }
   
@@ -154,8 +157,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends
KafkaMe
     requestQueue.take()
 
   /** Get a response for the given processor if there is one */
-  def receiveResponse(processor: Int): RequestChannel.Response =
-    responseQueues(processor).poll()
+  def receiveResponse(processor: Int): RequestChannel.Response = {
+    val response = responseQueues(processor).poll()
+    if (response != null)
+      response.request.responseDequeueTimeMs = SystemTime.milliseconds
+    response
+  }
 
   def addResponseListener(onResponse: Int => Unit) { 
     responseListeners ::= onResponse
@@ -177,11 +184,13 @@ object RequestMetrics {
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
   val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
   // time a request spent in a request queue
-  val queueTimeHist = newHistogram(name + "-QueueTimeMs")
+  val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs")
   // time a request takes to be processed at the local broker
   val localTimeHist = newHistogram(name + "-LocalTimeMs")
   // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
   val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
+  // time a response spent in a response queue
+  val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs")
   // time to send the response to the requester
   val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
   val totalTimeHist = newHistogram(name + "-TotalTimeMs")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bf4dbd5e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 6d562c2..d0f05cb 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -37,7 +37,7 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
             id, brokerId))
           return
         }
-        req.dequeueTimeMs = SystemTime.milliseconds
+        req.requestDequeueTimeMs = SystemTime.milliseconds
         trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId,
req))
         apis.handle(req)
       } catch {


Mime
View raw message