kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [15/27] git commit: kafka-801; Fix MessagesInPerSec mbean to count uncompressed message rate; patched by Jun Rao; reviewed by Neha Narkhede
Date Thu, 18 Apr 2013 04:54:09 GMT
kafka-801; Fix MessagesInPerSec mbean to count uncompressed message rate; patched by Jun Rao;
reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd967616
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd967616
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd967616

Branch: refs/heads/trunk
Commit: dd9676163956b3e577808a1e9744aa9fb5e83e4e
Parents: 290d5e0
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Mar 13 10:10:52 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Mar 13 10:10:52 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala |    6 +++---
 1 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd967616/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b2a7170..34c5376 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -265,9 +265,6 @@ private[kafka] class Log(val dir: File,
     if(messageSetInfo.count == 0) {
       (-1L, -1L)
     } else {
-      BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
-      BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count)
-
       // trim any invalid bytes or partial messages before appending it to the on-disk log
       var validMessages = trimInvalidBytes(messages)
 
@@ -288,6 +285,9 @@ private[kafka] class Log(val dir: File,
                 case e: IOException => throw new KafkaException("Error in validating messages
while appending to log '%s'".format(name), e)
               }
               val lastOffset = offsetCounter.get - 1
+              val numMessages = lastOffset - firstOffset + 1
+              BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages)
+              BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
               (firstOffset, lastOffset)
             } else {
               require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " +
messages)


Mime
View raw message