Author: nehanarkhede
Date: Wed Aug 24 00:50:28 2011
New Revision: 1160947
URL: http://svn.apache.org/viewvc?rev=1160947&view=rev
Log:
Producer performance tool should use the new blocking async producer instead of the sleep
timeout hack; KAFKA-118; patched by nehanarkhede; reviewed by junrao
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh
incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala?rev=1160947&r1=1160946&r2=1160947&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala Wed Aug
24 00:50:28 2011
@@ -83,11 +83,6 @@ object ProducerPerformance {
.defaultsTo(100)
val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will
vary up to the given maximum.")
val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
- val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2
batch sends.")
- .withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Long])
- .defaultsTo(0)
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single
batch.")
.withRequiredArg
.describedAs("size")
@@ -122,7 +117,6 @@ object ProducerPerformance {
val messageSize = options.valueOf(messageSizeOpt).intValue
val isFixSize = !options.has(varyMessageSizeOpt)
val isAsync = options.has(asyncOpt)
- val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
var batchSize = options.valueOf(batchSizeOpt).intValue
val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt)
@@ -156,7 +150,7 @@ object ProducerPerformance {
props.put("batch.size", config.batchSize.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
-
+ props.put("queue.enqueueTimeout.ms", "-1")
logger.info("Producer properties = " + props.toString)
val producerConfig = new ProducerConfig(props)
@@ -183,8 +177,6 @@ object ProducerPerformance {
bytesSent += config.messageSize
try {
producer.send(new ProducerData[String,String](config.topic, message))
- if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize ==
0)
- Thread.sleep(config.delayedMSBtwSend)
nSends += 1
}catch {
case e: Exception => e.printStackTrace
@@ -253,8 +245,6 @@ object ProducerPerformance {
bytesSent += config.batchSize*config.messageSize
try {
producer.send(new ProducerData[String,String](config.topic, messageSet))
- if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize ==
0)
- Thread.sleep(config.delayedMSBtwSend)
nSends += 1
}catch {
case e: Exception => e.printStackTrace
Modified: incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh?rev=1160947&r1=1160946&r2=1160947&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh Wed Aug 24 00:50:28
2011
@@ -111,7 +111,7 @@ shutdown_servers() {
start_producer() {
topic=$1
info "start producing messages for topic $topic ..."
- $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181
--topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size
--threads 1 --reporting-interval $num_messages --async --delay-btw-batch-ms 10 2>&1
> $base_dir/producer_performance.log &
+ $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181
--topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size
--threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log
&
pid_producer=$!
}
Modified: incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh?rev=1160947&r1=1160946&r2=1160947&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh (original)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh Wed Aug 24
00:50:28 2011
@@ -14,7 +14,7 @@ $base_dir/../../bin/kafka-server-start.s
sleep 4
echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 --compression-codec
1
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async --compression-codec 1
echo "wait for data to be persisted"
cur_offset="-1"
Modified: incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh?rev=1160947&r1=1160946&r2=1160947&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh Wed Aug 24 00:50:28 2011
@@ -14,7 +14,7 @@ $base_dir/../../bin/kafka-server-start.s
sleep 4
echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async
echo "wait for data to be persisted"
cur_offset="-1"
|