kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1160947 - in /incubator/kafka/trunk: core/src/main/scala/kafka/tools/ProducerPerformance.scala system_test/embedded_consumer/bin/run-test.sh system_test/producer_perf/bin/run-compression-test.sh system_test/producer_perf/bin/run-test.sh
Date Wed, 24 Aug 2011 00:50:29 GMT
Author: nehanarkhede
Date: Wed Aug 24 00:50:28 2011
New Revision: 1160947

URL: http://svn.apache.org/viewvc?rev=1160947&view=rev
Log:
Producer performance tool should use the new blocking async producer instead of the sleep
timeout hack; KAFKA-118; patched by nehanarkhede; reviewed by junrao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
    incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
    incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh
    incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala?rev=1160947&r1=1160946&r2=1160947&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala Wed Aug
24 00:50:28 2011
@@ -83,11 +83,6 @@ object ProducerPerformance {
       .defaultsTo(100)
     val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will
vary up to the given maximum.")
     val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
-    val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2
batch sends.")
-      .withRequiredArg
-      .describedAs("ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(0)
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single
batch.")
       .withRequiredArg
       .describedAs("size")
@@ -122,7 +117,6 @@ object ProducerPerformance {
     val messageSize = options.valueOf(messageSizeOpt).intValue
     val isFixSize = !options.has(varyMessageSizeOpt)
     val isAsync = options.has(asyncOpt)
-    val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
     var batchSize = options.valueOf(batchSizeOpt).intValue
     val numThreads = options.valueOf(numThreadsOpt).intValue
     val topic = options.valueOf(topicOpt)
@@ -156,7 +150,7 @@ object ProducerPerformance {
     props.put("batch.size", config.batchSize.toString)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("buffer.size", (64*1024).toString)
-
+    props.put("queue.enqueueTimeout.ms", "-1")
     logger.info("Producer properties = " + props.toString)
 
     val producerConfig = new ProducerConfig(props)
@@ -183,8 +177,6 @@ object ProducerPerformance {
           bytesSent += config.messageSize
         try  {
           producer.send(new ProducerData[String,String](config.topic, message))
-          if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize ==
0)
-            Thread.sleep(config.delayedMSBtwSend)
           nSends += 1
         }catch {
           case e: Exception => e.printStackTrace
@@ -253,8 +245,6 @@ object ProducerPerformance {
           bytesSent += config.batchSize*config.messageSize
         try  {
           producer.send(new ProducerData[String,String](config.topic, messageSet))
-          if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize ==
0)
-            Thread.sleep(config.delayedMSBtwSend)
           nSends += 1
         }catch {
           case e: Exception => e.printStackTrace

Modified: incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh?rev=1160947&r1=1160946&r2=1160947&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh Wed Aug 24 00:50:28
2011
@@ -111,7 +111,7 @@ shutdown_servers() {
 start_producer() {
     topic=$1
     info "start producing messages for topic $topic ..."
-    $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181
--topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size
--threads 1 --reporting-interval $num_messages --async --delay-btw-batch-ms 10 2>&1
> $base_dir/producer_performance.log &
+    $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181
--topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size
--threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log
&
     pid_producer=$!
 }
 

Modified: incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh?rev=1160947&r1=1160946&r2=1160947&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh (original)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh Wed Aug 24
00:50:28 2011
@@ -14,7 +14,7 @@ $base_dir/../../bin/kafka-server-start.s
 
 sleep 4
 echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 --compression-codec
1 
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async --compression-codec 1 
 
 echo "wait for data to be persisted" 
 cur_offset="-1"

Modified: incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh?rev=1160947&r1=1160946&r2=1160947&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh Wed Aug 24 00:50:28 2011
@@ -14,7 +14,7 @@ $base_dir/../../bin/kafka-server-start.s
 
 sleep 4
 echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async
 
 echo "wait for data to be persisted" 
 cur_offset="-1"



Mime
View raw message