kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1621 : Standardize --messages option. Closes #58
Date Tue, 28 Apr 2015 16:31:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2e90f5e51 -> f148d8659


KAFKA-1621 : Standardize --messages option. Closes #58


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f148d865
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f148d865
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f148d865

Branch: refs/heads/trunk
Commit: f148d8659bca757ff889b404841b423d9e71ba41
Parents: 2e90f5e
Author: Joshi <rekhajoshm@gmail.com>
Authored: Mon Apr 27 14:02:15 2015 -0700
Committer: Neha Narkhede <nehanarkhede@apache.org>
Committed: Tue Apr 28 09:31:50 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/ConsumerPerformance.scala   | 13 +++++--------
 .../main/scala/kafka/tools/ProducerPerformance.scala   |  4 ++--
 .../scala/kafka/tools/SimpleConsumerPerformance.scala  |  6 ++++--
 3 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f148d865/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 3eba028..903318d 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -22,11 +22,8 @@ import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
 import org.apache.log4j.Logger
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.record.Record
-import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import kafka.message.Message
-import kafka.utils.{ZkUtils, CommandLineUtils}
+import kafka.utils.CommandLineUtils
 import java.util.{ Random, Properties }
 import kafka.consumer.Consumer
 import kafka.consumer.ConsumerConnector
@@ -44,8 +41,8 @@ object ConsumerPerformance {
 
     val config = new ConsumerPerfConfig(args)
     logger.info("Starting consumer...")
-    var totalMessagesRead = new AtomicLong(0)
-    var totalBytesRead = new AtomicLong(0)
+    val totalMessagesRead = new AtomicLong(0)
+    val totalBytesRead = new AtomicLong(0)
 
     if (!config.hideHeader) {
       if (!config.showDetailedStats)
@@ -177,7 +174,7 @@ object ConsumerPerformance {
 
     val options = parser.parse(args: _*)
 
-    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
+    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)
    
     val useNewConsumer = options.has(useNewConsumerOpt)
     
@@ -193,7 +190,7 @@ object ConsumerPerformance {
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
       props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
     } else {
-      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, numMessagesOpt)
       props.put("group.id", options.valueOf(groupIdOpt))
       props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
       props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f148d865/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 71b1bd5..0ebfa59 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -70,7 +70,7 @@ object ProducerPerformance extends Logging {
   }
 
   class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of
broker host and port for bootstrap.")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info the list of
broker host and port for bootstrap.")
       .withRequiredArg
       .describedAs("hostname:port,..,hostname:port")
       .ofType(classOf[String])
@@ -78,7 +78,7 @@ object ProducerPerformance extends Logging {
       .withRequiredArg
       .describedAs("topic1,topic2..")
       .ofType(classOf[String])
-    val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request
timeout in ms")
+    val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The producer
request timeout in ms")
       .withRequiredArg()
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(3000)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f148d865/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 900f7df..5e3c605 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -31,9 +31,11 @@ import kafka.common.TopicAndPartition
  */
 object SimpleConsumerPerformance {
 
+  private val logger = Logger.getLogger(getClass())
+
   def main(args: Array[String]) {
-    val logger = Logger.getLogger(getClass)
     val config = new ConsumerPerfConfig(args)
+    logger.info("Starting SimpleConsumer...")
 
     if(!config.hideHeader) {
       if(!config.showDetailedStats)
@@ -141,7 +143,7 @@ object SimpleConsumerPerformance {
 
     val options = parser.parse(args : _*)
 
-    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, urlOpt)
+    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, urlOpt, numMessagesOpt)
 
     val url = new URI(options.valueOf(urlOpt))
     val fetchSize = options.valueOf(fetchSizeOpt).intValue


Mime
View raw message