kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6743; ConsumerPerformance fails to consume all messages [KIP-281] (#4818)
Date Sat, 02 Jun 2018 23:44:19 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d5f864  KAFKA-6743; ConsumerPerformance fails to consume all messages [KIP-281]
 (#4818)
1d5f864 is described below

commit 1d5f8649ce3e7c8d3b437e89b2cfbc6fdc18e49f
Author: Alex Dunayevsky <rootex-@users.noreply.github.com>
AuthorDate: Sun Jun 3 02:43:53 2018 +0300

    KAFKA-6743; ConsumerPerformance fails to consume all messages [KIP-281]  (#4818)
    
    This patch implements KIP-281, which adds a configurable timeout to the consumer performance
tool with a default value of 10 seconds. The old timeout was hard-coded as 1 second. Additionally,
this patch adds a warning message when the tool exits after a timeout rather than returning
silently.
    
    Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/tools/ConsumerPerformance.scala | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index d2aeab7..db27a8b 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -65,7 +65,7 @@ object ConsumerPerformance extends LazyLogging {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
       consumer.subscribe(Collections.singletonList(config.topic))
       startMs = System.currentTimeMillis
-      consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead,
totalBytesRead, joinGroupTimeInMs, startMs)
+      consume(consumer, List(config.topic), config.numMessages, config.recordFetchTimeoutMs,
config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs)
       endMs = System.currentTimeMillis
 
       if (config.printMetrics) {
@@ -188,6 +188,9 @@ object ConsumerPerformance extends LazyLogging {
       }
     }
 
+    if (messagesRead < count)
+      println(s"WARNING: Exiting before consuming the expected number of messages: timeout
($timeout ms) exceeded. " +
+        "You can use the --timeout option to increase the timeout."))
     totalMessagesRead.set(messagesRead)
     totalBytesRead.set(bytesRead)
   }
@@ -302,6 +305,11 @@ object ConsumerPerformance extends LazyLogging {
     val printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics. This only
applies to new consumer.")
     val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported
for each reporting " +
       "interval as configured by reporting-interval")
+    val recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds
between returned records.")
+      .withOptionalArg()
+      .describedAs("milliseconds")
+      .ofType(classOf[Long])
+      .defaultsTo(10000)
 
     val options = parser.parse(args: _*)
 
@@ -354,6 +362,7 @@ object ConsumerPerformance extends LazyLogging {
     val showDetailedStats = options.has(showDetailedStatsOpt)
     val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
     val hideHeader = options.has(hideHeaderOpt)
+    val recordFetchTimeoutMs = options.valueOf(recordFetchTimeoutOpt).longValue()
   }
 
   class ConsumerPerfThread(threadId: Int,

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message