kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject kafka git commit: KAFKA-1005 Shutdown consumer at the end of consumer performance test.
Date Sun, 05 Apr 2015 02:37:53 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9a071017e -> 07598ad8a


KAFKA-1005 Shutdown consumer at the end of consumer performance test.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/07598ad8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/07598ad8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/07598ad8

Branch: refs/heads/trunk
Commit: 07598ad8a4e508e738c4af11f16b1649d90d5fdb
Parents: 9a07101
Author: paul mackles <paulm@loopr.com>
Authored: Sat Apr 4 19:36:54 2015 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Sat Apr 4 19:36:54 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/ConsumerPerformance.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/07598ad8/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index c39c067..3eba028 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -61,6 +61,7 @@ object ConsumerPerformance {
       startMs = System.currentTimeMillis
       consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
       endMs = System.currentTimeMillis
+      consumer.close()
     } else {
       import kafka.consumer.ConsumerConfig
       val consumerConfig = new ConsumerConfig(config.props)
@@ -80,6 +81,7 @@ object ConsumerPerformance {
       for (thread <- threadList)
         thread.join
       endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs 
+      consumerConnector.shutdown()
     }
     val elapsedSecs = (endMs - startMs) / 1000.0
     if (!config.showDetailedStats) {
@@ -87,7 +89,6 @@ object ConsumerPerformance {
       println(("%s, %s, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
config.dateFormat.format(endMs),
         totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get
/ elapsedSecs))
     }
-    System.exit(0)
   }
   
   def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], count: Long, timeout: Long,
config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {


Mime
View raw message