kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1402250 - /incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
Date Thu, 25 Oct 2012 18:15:41 GMT
Author: junrao
Date: Thu Oct 25 18:15:41 2012
New Revision: 1402250

URL: http://svn.apache.org/viewvc?rev=1402250&view=rev
Log:
produce/fetch remote time metric not set correctly when num.acks = 1; patched by Jun Rao;
reviewed by Neha Narkhede; KAFKA-584

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1402250&r1=1402249&r2=1402250&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Thu
Oct 25 18:15:41 2012
@@ -40,9 +40,9 @@ object RequestChannel extends Logging {
   }
 
   case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long)
{
-    var dequeueTimeMs = -1L
-    var apiLocalCompleteTimeMs = -1L
-    var responseCompleteTimeMs = -1L
+    @volatile var dequeueTimeMs = -1L
+    @volatile var apiLocalCompleteTimeMs = -1L
+    @volatile var responseCompleteTimeMs = -1L
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer.rewind()
@@ -50,6 +50,10 @@ object RequestChannel extends Logging {
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds
+      // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes
since the remote
+      // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
+      if (apiLocalCompleteTimeMs < 0)
+        apiLocalCompleteTimeMs = responseCompleteTimeMs
       val queueTime = (dequeueTimeMs - startTimeMs).max(0L)
       val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L)
       val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
@@ -71,8 +75,9 @@ object RequestChannel extends Logging {
              m.responseSendTimeHist.update(responseSendTime)
              m.totalTimeHist.update(totalTime)
       }
+      trace("Completed request: %s totalTime:%d queueTime:%d localTime:%d remoteTime:%d sendTime:%d"
+        .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
     }
-    trace("Completed request: %s".format(requestObj))
   }
   
   case class Response(processor: Int, request: Request, responseSend: Send) {



Mime
View raw message