kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3111; Fix ConsumerPerformance reporting to use time-based instead of message-based intervals
Date Fri, 08 Jul 2016 23:18:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 542350d61 -> 0edaa89be


KAFKA-3111; Fix ConsumerPerformance reporting to use time-based instead of message-based intervals

Interval lengths for ConsumerPerformance could sometime be calculated as zero. In such cases,
when the bytes read or messages read are also zero a NaN output is returned for mbRead per
second or for nMsg per second, whereas zero would be a more appropriate output.

In cases where interval length is zero but there have been data and messages to read, an output
of Infinity is returned, as expected.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #788 from vahidhashemian/KAFKA-3111


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0edaa89b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0edaa89b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0edaa89b

Branch: refs/heads/trunk
Commit: 0edaa89be35ad427b848e0d91164422871853640
Parents: 542350d
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Fri Jul 8 16:18:22 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Jul 8 16:18:22 2016 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsumerPerformance.scala | 49 +++++++++++---------
 .../src/main/scala/kafka/tools/PerfConfig.scala |  4 +-
 2 files changed, 29 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0edaa89b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 8e5dcc8..36376bf 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -58,7 +58,7 @@ object ConsumerPerformance {
     }
 
     var startMs, endMs = 0L
-    if(config.useNewConsumer) {
+    if (config.useNewConsumer) {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
       consumer.subscribe(List(config.topic))
       startMs = System.currentTimeMillis
@@ -95,7 +95,7 @@ object ConsumerPerformance {
         totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get
/ elapsedSecs))
     }
   }
-  
+
   def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count:
Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead:
AtomicLong) {
     var bytesRead = 0L
     var messagesRead = 0L
@@ -125,32 +125,34 @@ object ConsumerPerformance {
     val startMs = System.currentTimeMillis
     var lastReportTime: Long = startMs
     var lastConsumedTime = System.currentTimeMillis
-    
-    while(messagesRead < count && System.currentTimeMillis() - lastConsumedTime
<= timeout) {
+    var currentTimeMillis = lastConsumedTime
+
+    while (messagesRead < count && currentTimeMillis - lastConsumedTime <=
timeout) {
       val records = consumer.poll(100)
-      if(records.count() > 0)
-        lastConsumedTime = System.currentTimeMillis
-      for(record <- records) {
+      currentTimeMillis = System.currentTimeMillis
+      if (records.count() > 0)
+        lastConsumedTime = currentTimeMillis
+      for (record <- records) {
         messagesRead += 1
-        if(record.key != null)
+        if (record.key != null)
           bytesRead += record.key.size
-        if(record.value != null)
-          bytesRead += record.value.size 
-      
-        if (messagesRead % config.reportingInterval == 0) {
+        if (record.value != null)
+          bytesRead += record.value.size
+
+        if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
           if (config.showDetailedStats)
-            printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
lastReportTime, System.currentTimeMillis, config.dateFormat)
-          lastReportTime = System.currentTimeMillis
+            printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
lastReportTime, currentTimeMillis, config.dateFormat)
+          lastReportTime = currentTimeMillis
           lastMessagesRead = messagesRead
           lastBytesRead = bytesRead
         }
       }
     }
-    
+
     totalMessagesRead.set(messagesRead)
     totalBytesRead.set(bytesRead)
   }
-  
+
   def printProgressMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long,
lastMessagesRead: Long,
     startMs: Long, endMs: Long, dateFormat: SimpleDateFormat) = {
     val elapsedMs: Double = endMs - startMs
@@ -210,14 +212,14 @@ object ConsumerPerformance {
     val options = parser.parse(args: _*)
 
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)
-   
+
     val useNewConsumer = options.has(useNewConsumerOpt)
-    
+
     val props = if (options.has(consumerConfigOpt))
       Utils.loadProps(options.valueOf(consumerConfigOpt))
     else
       new Properties
-    if(useNewConsumer) {
+    if (useNewConsumer) {
       import org.apache.kafka.clients.consumer.ConsumerConfig
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))
       props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
@@ -241,6 +243,8 @@ object ConsumerPerformance {
     val topic = options.valueOf(topicOpt)
     val numMessages = options.valueOf(numMessagesOpt).longValue
     val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    if (reportingInterval <= 0)
+      throw new IllegalArgumentException("Reporting interval must be greater than 0.")
     val showDetailedStats = options.has(showDetailedStatsOpt)
     val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
     val hideHeader = options.has(hideHeaderOpt)
@@ -264,11 +268,12 @@ object ConsumerPerformance {
           val messageAndMetadata = iter.next
           messagesRead += 1
           bytesRead += messageAndMetadata.message.length
+          val currentTimeMillis = System.currentTimeMillis
 
-          if (messagesRead % config.reportingInterval == 0) {
+          if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
             if (config.showDetailedStats)
-              printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
lastReportTime, System.currentTimeMillis, config.dateFormat)
-            lastReportTime = System.currentTimeMillis
+              printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
lastReportTime, currentTimeMillis, config.dateFormat)
+            lastReportTime = currentTimeMillis
             lastMessagesRead = messagesRead
             lastBytesRead = bytesRead
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0edaa89b/core/src/main/scala/kafka/tools/PerfConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala
index 298bb29..26704c2 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -26,9 +26,9 @@ class PerfConfig(args: Array[String]) {
     .withRequiredArg
     .describedAs("count")
     .ofType(classOf[java.lang.Long])
-  val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print
progress info.")
+  val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in milliseconds
at which to print progress info.")
     .withRequiredArg
-    .describedAs("size")
+    .describedAs("interval_ms")
     .ofType(classOf[java.lang.Integer])
     .defaultsTo(5000)
   val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting
the time field. " +


Mime
View raw message